You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/06/08 20:28:49 UTC
[1/3] GIRAPH-908: support for partitioned input in giraph (pavanka)
Repository: giraph
Updated Branches:
refs/heads/trunk 535a333b7 -> 4a133f576
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java
new file mode 100644
index 0000000..fc9f9d3
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping.examples;
+
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.input.parser.Records;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import org.apache.giraph.hive.input.mapping.SimpleHiveToMapping;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Long VertexId, Byte MappingTarget implementation of HiveToMapping
+ */
+public class LongByteHiveToMapping extends SimpleHiveToMapping<LongWritable,
+ ByteWritable> {
+
+ @Override
+ public void checkInput(HiveInputDescription inputDesc,
+ HiveTableSchema schema) {
+ Records.verifyType(0, HiveType.LONG, schema);
+ Records.verifyType(1, HiveType.BYTE, schema);
+ }
+
+ @Override
+ public LongWritable getVertexId(HiveReadableRecord record) {
+ LongWritable reusableId = getReusableVertexId();
+ reusableId.set(record.getLong(0));
+ return reusableId;
+ }
+
+ @Override
+ public ByteWritable getMappingTarget(HiveReadableRecord record) {
+ ByteWritable reusableTarget = getReusableMappingTarget();
+ reusableTarget.set(record.getByte(1));
+ return reusableTarget;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java
new file mode 100644
index 0000000..617bc4f
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping.examples;
+
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.input.parser.Records;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import org.apache.giraph.hive.input.mapping.SimpleHiveToMapping;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.util.Iterator;
+
+/**
+ * Long VertexId, Int Mapping target -> Byte MappingTarget
+ * implementation of HiveToMapping
+ *
+ * The input table has long id, int bucket value
+ * we need to translate this to long id & byte bucket value
+ */
+public class LongInt2ByteHiveToMapping extends SimpleHiveToMapping<LongWritable,
+ ByteWritable> {
+
+ /** Number of workers for the job */
+ private int numWorkers = 0;
+
+ @Override
+ public void initializeRecords(Iterator<HiveReadableRecord> records) {
+ super.initializeRecords(records);
+ numWorkers = getConf().getMaxWorkers();
+ if (numWorkers <= 0 || numWorkers >= 255) {
+ throw new IllegalStateException("#workers should be > 0 & < 255");
+ }
+ }
+
+ @Override
+ public void checkInput(HiveInputDescription inputDesc,
+ HiveTableSchema schema) {
+ Records.verifyType(0, HiveType.LONG, schema);
+ Records.verifyType(1, HiveType.INT, schema);
+ }
+
+ @Override
+ public LongWritable getVertexId(HiveReadableRecord record) {
+ long id = record.getLong(0);
+ LongWritable reusableId = getReusableVertexId();
+ reusableId.set(id);
+ return reusableId;
+ }
+
+ @Override
+ public ByteWritable getMappingTarget(HiveReadableRecord record) {
+ int target = record.getInt(1);
+ ByteWritable reusableTarget = getReusableMappingTarget();
+ int bVal = target % numWorkers;
+ if ((bVal >>> 8) != 0) {
+ throw new IllegalStateException("target % numWorkers overflows " +
+ "byte range");
+ }
+ reusableTarget.set((byte) bVal);
+ return reusableTarget;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java
new file mode 100644
index 0000000..41afed6
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Sample implementations of HiveToMapping interface
+ */
+package org.apache.giraph.hive.input.mapping.examples;
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java
new file mode 100644
index 0000000..c7ad2a3
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive Mapping input related classes
+ */
+package org.apache.giraph.hive.input.mapping;
[3/3] git commit: updated refs/heads/trunk to 4a133f5
Posted by pa...@apache.org.
GIRAPH-908: support for partitioned input in giraph (pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4a133f57
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4a133f57
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4a133f57
Branch: refs/heads/trunk
Commit: 4a133f5766c09362917e0416af503c0a00b24e87
Parents: 535a333
Author: Pavan Kumar <pa...@fb.com>
Authored: Sun Jun 8 10:36:03 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Sun Jun 8 10:36:03 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/bsp/BspService.java | 208 +++++++++++++-----
.../giraph/bsp/CentralizedServiceMaster.java | 10 +
.../org/apache/giraph/conf/GiraphClasses.java | 24 ++-
.../org/apache/giraph/conf/GiraphConstants.java | 32 +++
.../ImmutableClassesGiraphConfiguration.java | 165 +++++++++++++++
.../apache/giraph/io/MappingInputFormat.java | 64 ++++++
.../org/apache/giraph/io/MappingReader.java | 124 +++++++++++
.../io/internal/WrappedMappingInputFormat.java | 99 +++++++++
.../io/internal/WrappedMappingReader.java | 105 ++++++++++
.../io/iterables/MappingReaderWrapper.java | 95 +++++++++
.../giraph/mapping/AbstractLongByteOps.java | 60 ++++++
.../mapping/DefaultEmbeddedLongByteOps.java | 73 +++++++
.../giraph/mapping/DefaultLongByteOps.java | 57 +++++
.../giraph/mapping/LongByteMappingStore.java | 143 +++++++++++++
.../org/apache/giraph/mapping/MappingEntry.java | 62 ++++++
.../org/apache/giraph/mapping/MappingStore.java | 70 +++++++
.../apache/giraph/mapping/MappingStoreOps.java | 72 +++++++
.../org/apache/giraph/mapping/package-info.java | 23 ++
.../translate/LongByteTranslateEdge.java | 123 +++++++++++
.../giraph/mapping/translate/TranslateEdge.java | 57 +++++
.../giraph/mapping/translate/package-info.java | 22 ++
.../apache/giraph/master/BspServiceMaster.java | 22 +-
.../org/apache/giraph/master/MasterThread.java | 5 +-
.../partition/GraphPartitionerFactory.java | 10 +-
.../partition/HashPartitionerFactory.java | 24 +--
.../partition/HashRangePartitionerFactory.java | 24 +--
.../LongMappingStorePartitionerFactory.java | 61 ++++++
.../SimpleIntRangePartitionerFactory.java | 18 +-
.../SimpleLongRangePartitionerFactory.java | 18 +-
.../partition/SimplePartitionerFactory.java | 37 ++--
.../partition/SimpleWorkerPartitioner.java | 34 ++-
.../apache/giraph/worker/BspServiceWorker.java | 138 +++++++++++-
.../giraph/worker/EdgeInputSplitsCallable.java | 23 ++
.../giraph/worker/FullInputSplitCallable.java | 210 +++++++++++++++++++
.../org/apache/giraph/worker/LocalData.java | 93 ++++++++
.../worker/MappingInputSplitsCallable.java | 109 ++++++++++
.../MappingInputSplitsCallableFactory.java | 96 +++++++++
.../worker/VertexInputSplitsCallable.java | 59 ++++++
.../SimpleRangePartitionFactoryTest.java | 2 +
.../apache/giraph/hive/HiveGiraphRunner.java | 73 +++++++
.../giraph/hive/common/GiraphHiveConstants.java | 4 +
.../apache/giraph/hive/common/HiveUtils.java | 28 +++
.../input/mapping/AbstractHiveToMapping.java | 39 ++++
.../input/mapping/HiveMappingInputFormat.java | 116 ++++++++++
.../hive/input/mapping/HiveMappingReader.java | 100 +++++++++
.../hive/input/mapping/HiveToMapping.java | 44 ++++
.../hive/input/mapping/SimpleHiveToMapping.java | 105 ++++++++++
.../mapping/examples/LongByteHiveToMapping.java | 56 +++++
.../examples/LongInt2ByteHiveToMapping.java | 81 +++++++
.../input/mapping/examples/package-info.java | 22 ++
.../giraph/hive/input/mapping/package-info.java | 22 ++
52 files changed, 3239 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index a0e94c1..37b94e2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-908: support for partitioned input in giraph (pavanka)
+
GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)
GIRAPH-899: Remove hcatalog from hadoop_facebook profile (pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index ec0ddbb..2e35373 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -76,6 +76,25 @@ public abstract class BspService<I extends WritableComparable,
public static final String BASE_DIR = "/_hadoopBsp";
/** Master job state znode above base dir */
public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
+
+ /** Mapping input split directory about base dir */
+ public static final String MAPPING_INPUT_SPLIT_DIR = "/_mappingInputSplitDir";
+ /** Mapping input split done directory about base dir */
+ public static final String MAPPING_INPUT_SPLIT_DONE_DIR =
+ "/_mappingInputSplitDoneDir";
+ /** Denotes a reserved mapping input split */
+ public static final String MAPPING_INPUT_SPLIT_RESERVED_NODE =
+ "/_mappingInputSplitReserved";
+ /** Denotes a finished mapping input split */
+ public static final String MAPPING_INPUT_SPLIT_FINISHED_NODE =
+ "/_mappingInputSplitFinished";
+ /** Denotes that all the mapping input splits are are ready for consumption */
+ public static final String MAPPING_INPUT_SPLITS_ALL_READY_NODE =
+ "/_mappingInputSplitsAllReady";
+ /** Denotes that all the mapping input splits are done. */
+ public static final String MAPPING_INPUT_SPLITS_ALL_DONE_NODE =
+ "/_mappingInputSplitsAllDone";
+
/** Vertex input split directory about base dir */
public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir";
/** Vertex input split done directory about base dir */
@@ -93,6 +112,7 @@ public abstract class BspService<I extends WritableComparable,
/** Denotes that all the vertex input splits are done. */
public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE =
"/_vertexInputSplitsAllDone";
+
/** Edge input split directory about base dir */
public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir";
/** Edge input split done directory about base dir */
@@ -188,10 +208,14 @@ public abstract class BspService<I extends WritableComparable,
protected final String basePath;
/** Path to the job state determined by the master (informative only) */
protected final String masterJobStatePath;
+ /** ZooKeeper paths for mapping input splits. */
+ protected final InputSplitPaths mappingInputSplitsPaths;
/** ZooKeeper paths for vertex input splits. */
protected final InputSplitPaths vertexInputSplitsPaths;
/** ZooKeeper paths for edge input splits. */
protected final InputSplitPaths edgeInputSplitsPaths;
+ /** Mapping input splits events */
+ protected final InputSplitEvents mappingInputSplitsEvents;
/** Vertex input split events. */
protected final InputSplitEvents vertexInputSplitsEvents;
/** Edge input split events. */
@@ -263,6 +287,7 @@ public abstract class BspService<I extends WritableComparable,
public BspService(
Mapper<?, ?, ?, ?>.Context context,
GraphTaskManager<I, V, E> graphTaskManager) {
+ this.mappingInputSplitsEvents = new InputSplitEvents(context);
this.vertexInputSplitsEvents = new InputSplitEvents(context);
this.edgeInputSplitsEvents = new InputSplitEvents(context);
this.connectedEvent = new PredicateLock(context);
@@ -313,6 +338,10 @@ public abstract class BspService<I extends WritableComparable,
getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
basePath);
masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
+ mappingInputSplitsPaths = new InputSplitPaths(basePath,
+ MAPPING_INPUT_SPLIT_DIR, MAPPING_INPUT_SPLIT_DONE_DIR,
+ MAPPING_INPUT_SPLITS_ALL_READY_NODE,
+ MAPPING_INPUT_SPLITS_ALL_DONE_NODE);
vertexInputSplitsPaths = new InputSplitPaths(basePath,
VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE);
@@ -676,8 +705,6 @@ public abstract class BspService<I extends WritableComparable,
* watches to see if the master commanded job state changes.
*
* @return Last job state or null if none
- * @throws InterruptedException
- * @throws KeeperException
*/
public final JSONObject getJobState() {
try {
@@ -784,8 +811,6 @@ public abstract class BspService<I extends WritableComparable,
* Get the latest superstep and cache it.
*
* @return the latest superstep
- * @throws InterruptedException
- * @throws KeeperException
*/
public final long getSuperstep() {
if (cachedSuperstep != UNSET_SUPERSTEP) {
@@ -959,7 +984,125 @@ public abstract class BspService<I extends WritableComparable,
}
workerHealthRegistrationChanged.signal();
eventProcessed = true;
+ } else if (processMappingEvent(event) || processVertexEvent(event) ||
+ processEdgeEvent(event)) {
+ return;
+ } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
+ event.getType() == EventType.NodeCreated) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: partitionAssignmentsReadyChanged " +
+ "(partitions are assigned)");
+ }
+ addressesAndPartitionsReadyChanged.signal();
+ eventProcessed = true;
+ } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
+ event.getType() == EventType.NodeCreated) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: superstepFinished signaled");
+ }
+ superstepFinished.signal();
+ eventProcessed = true;
+ } else if (event.getPath().endsWith(applicationAttemptsPath) &&
+ event.getType() == EventType.NodeChildrenChanged) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: applicationAttemptChanged signaled");
+ }
+ applicationAttemptChanged.signal();
+ eventProcessed = true;
+ } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
+ event.getType() == EventType.NodeChildrenChanged) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: masterElectionChildrenChanged signaled");
+ }
+ masterElectionChildrenChanged.signal();
+ eventProcessed = true;
+ } else if (event.getPath().equals(cleanedUpPath) &&
+ event.getType() == EventType.NodeChildrenChanged) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: cleanedUpChildrenChanged signaled");
+ }
+ cleanedUpChildrenChanged.signal();
+ eventProcessed = true;
+ }
+
+ if (!(processEvent(event)) && (!eventProcessed)) {
+ LOG.warn("process: Unknown and unprocessed event (path=" +
+ event.getPath() + ", type=" + event.getType() +
+ ", state=" + event.getState() + ")");
+ }
+ }
+
+ /**
+ * Process WatchedEvent for Mapping Inputsplits
+ *
+ * @param event watched event
+ * @return true if event processed
+ */
+ public final boolean processMappingEvent(WatchedEvent event) {
+ boolean eventProcessed = false;
+ if (event.getPath().equals(
+ mappingInputSplitsPaths.getAllReadyPath()) &&
+ (event.getType() == EventType.NodeCreated)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: inputSplitsReadyChanged " +
+ "(input splits ready)");
+ }
+ mappingInputSplitsEvents.getAllReadyChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
+ (event.getType() == EventType.NodeCreated)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("process: mappingInputSplitsStateChanged " +
+ "(made a reservation)");
+ }
+ mappingInputSplitsEvents.getStateChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
+ (event.getType() == EventType.NodeDeleted)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: mappingInputSplitsStateChanged " +
+ "(lost a reservation)");
+ }
+ mappingInputSplitsEvents.getStateChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_FINISHED_NODE) &&
+ (event.getType() == EventType.NodeCreated)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("process: mappingInputSplitsStateChanged " +
+ "(finished inputsplit)");
+ }
+ mappingInputSplitsEvents.getStateChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_DONE_DIR) &&
+ (event.getType() == EventType.NodeChildrenChanged)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("process: mappingInputSplitsDoneStateChanged " +
+ "(worker finished sending)");
+ }
+ mappingInputSplitsEvents.getDoneStateChanged().signal();
+ eventProcessed = true;
} else if (event.getPath().equals(
+ mappingInputSplitsPaths.getAllDonePath()) &&
+ (event.getType() == EventType.NodeCreated)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: mappingInputSplitsAllDoneChanged " +
+ "(all entries sent from input splits)");
+ }
+ mappingInputSplitsEvents.getAllDoneChanged().signal();
+ eventProcessed = true;
+ }
+ return eventProcessed;
+ }
+
+ /**
+ * Process WatchedEvent for Vertex Inputsplits
+ *
+ * @param event watched event
+ * @return true if event processed
+ */
+ public final boolean processVertexEvent(WatchedEvent event) {
+ boolean eventProcessed = false;
+ if (event.getPath().equals(
vertexInputSplitsPaths.getAllReadyPath()) &&
(event.getType() == EventType.NodeCreated)) {
if (LOG.isInfoEnabled()) {
@@ -1009,7 +1152,19 @@ public abstract class BspService<I extends WritableComparable,
}
vertexInputSplitsEvents.getAllDoneChanged().signal();
eventProcessed = true;
- } else if (event.getPath().equals(
+ }
+ return eventProcessed;
+ }
+
+ /**
+ * Process WatchedEvent for Edge Inputsplits
+ *
+ * @param event watched event
+ * @return true if event processed
+ */
+ public final boolean processEdgeEvent(WatchedEvent event) {
+ boolean eventProcessed = false;
+ if (event.getPath().equals(
edgeInputSplitsPaths.getAllReadyPath()) &&
(event.getType() == EventType.NodeCreated)) {
if (LOG.isInfoEnabled()) {
@@ -1059,48 +1214,7 @@ public abstract class BspService<I extends WritableComparable,
}
edgeInputSplitsEvents.getAllDoneChanged().signal();
eventProcessed = true;
- } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
- event.getType() == EventType.NodeCreated) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: partitionAssignmentsReadyChanged " +
- "(partitions are assigned)");
- }
- addressesAndPartitionsReadyChanged.signal();
- eventProcessed = true;
- } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
- event.getType() == EventType.NodeCreated) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: superstepFinished signaled");
- }
- superstepFinished.signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(applicationAttemptsPath) &&
- event.getType() == EventType.NodeChildrenChanged) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: applicationAttemptChanged signaled");
- }
- applicationAttemptChanged.signal();
- eventProcessed = true;
- } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
- event.getType() == EventType.NodeChildrenChanged) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: masterElectionChildrenChanged signaled");
- }
- masterElectionChildrenChanged.signal();
- eventProcessed = true;
- } else if (event.getPath().equals(cleanedUpPath) &&
- event.getType() == EventType.NodeChildrenChanged) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: cleanedUpChildrenChanged signaled");
- }
- cleanedUpChildrenChanged.signal();
- eventProcessed = true;
- }
-
- if (!(processEvent(event)) && (!eventProcessed)) {
- LOG.warn("process: Unknown and unprocessed event (path=" +
- event.getPath() + ", type=" + event.getType() +
- ", state=" + event.getState() + ")");
}
+ return eventProcessed;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index bda967d..e5b7cf3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -71,6 +71,16 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
/**
* Create the {@link BspInputSplit} objects from the index range based on the
+ * user-defined MappingInputFormat. The {@link BspInputSplit} objects will
+ * processed by the workers later on during the INPUT_SUPERSTEP.
+ *
+ * @return Number of splits. Returns -1 on failure to create
+ * valid input splits.
+ */
+ int createMappingInputSplits();
+
+ /**
+ * Create the {@link BspInputSplit} objects from the index range based on the
* user-defined VertexInputFormat. The {@link BspInputSplit} objects will
* processed by the workers later on during the INPUT_SUPERSTEP.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 3337621..e7b18aa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -31,6 +31,7 @@ import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -85,6 +86,9 @@ public class GiraphClasses<I extends WritableComparable,
/** Vertex output format class - cached for fast access */
protected Class<? extends VertexOutputFormat<I, V, E>>
vertexOutputFormatClass;
+ /** Mapping input format - cached for fast access */
+ protected Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
+ mappingInputFormatClass;
/** Edge input format class - cached for fast access */
protected Class<? extends EdgeInputFormat<I, E>>
edgeInputFormatClass;
@@ -113,8 +117,7 @@ public class GiraphClasses<I extends WritableComparable,
/** Edge Input Filter class */
protected Class<? extends EdgeInputFilter<I, E>> edgeInputFilterClass;
/** Vertex Input Filter class */
- protected Class<? extends VertexInputFilter<I, V, E>>
- vertexInputFilterClass;
+ protected Class<? extends VertexInputFilter<I, V, E>> vertexInputFilterClass;
/**
* Empty constructor. Initialize with default classes or null.
@@ -181,6 +184,9 @@ public class GiraphClasses<I extends WritableComparable,
EDGE_INPUT_FORMAT_CLASS.get(conf);
edgeOutputFormatClass = (Class<? extends EdgeOutputFormat<I, V, E>>)
EDGE_OUTPUT_FORMAT_CLASS.get(conf);
+ mappingInputFormatClass = (Class<? extends MappingInputFormat<I, V, E,
+ ? extends Writable>>)
+ MAPPING_INPUT_FORMAT_CLASS.get(conf);
aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
messageCombinerClass =
@@ -335,6 +341,15 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
+ * Check if MappingInputFormat is set
+ *
+ * @return true if MappingInputFormat is set
+ */
+ public boolean hasMappingInputFormat() {
+ return mappingInputFormatClass != null;
+ }
+
+ /**
* Get VertexOutputFormat set
*
* @return VertexOutputFormat
@@ -344,6 +359,11 @@ public class GiraphClasses<I extends WritableComparable,
return vertexOutputFormatClass;
}
+ public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
+ getMappingInputFormatClass() {
+ return mappingInputFormatClass;
+ }
+
/**
* Check if EdgeInputFormat is set
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 611a0dc..dd0c9ae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -45,6 +45,7 @@ import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -56,6 +57,9 @@ import org.apache.giraph.job.DefaultJobObserver;
import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.job.GiraphJobRetryChecker;
import org.apache.giraph.job.HaltApplicationUtils;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingStoreOps;
+import org.apache.giraph.mapping.translate.TranslateEdge;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
@@ -80,6 +84,30 @@ public interface GiraphConstants {
/** 1KB in bytes */
int ONE_KB = 1024;
+ /** Mapping related information */
+ ClassConfOption<? extends MappingStore> MAPPING_STORE_CLASS =
+ ClassConfOption.create("giraph.mappingStoreClass", null,
+ MappingStore.class, "MappingStore Class");
+
+ /** Class to use for performing read operations on mapping store */
+ ClassConfOption<? extends MappingStoreOps> MAPPING_STORE_OPS_CLASS =
+ ClassConfOption.create("giraph.mappingStoreOpsClass", null,
+ MappingStoreOps.class, "MappingStoreOps class");
+
+ /** Upper value of LongByteMappingStore */
+ IntConfOption LB_MAPPINGSTORE_UPPER =
+ new IntConfOption("giraph.lbMappingStoreUpper", -1,
+ "'upper' value used by lbmappingstore");
+ /** Lower value of LongByteMappingStore */
+ IntConfOption LB_MAPPINGSTORE_LOWER =
+ new IntConfOption("giraph.lbMappingStoreLower", -1,
+ "'lower' value used by lbMappingstore");
+ /** Class used to conduct expensive edge translation during vertex input */
+ ClassConfOption EDGE_TRANSLATION_CLASS =
+ ClassConfOption.create("giraph.edgeTranslationClass", null,
+ TranslateEdge.class, "Class used to conduct expensive edge " +
+ "translation during vertex input phase");
+
/** Computation class - required */
ClassConfOption<Computation> COMPUTATION_CLASS =
ClassConfOption.create("giraph.computationClass", null,
@@ -230,6 +258,10 @@ public interface GiraphConstants {
ClassConfOption<EdgeInputFormat> EDGE_INPUT_FORMAT_CLASS =
ClassConfOption.create("giraph.edgeInputFormatClass", null,
EdgeInputFormat.class, "EdgeInputFormat class");
+ /** MappingInputFormat class */
+ ClassConfOption<MappingInputFormat> MAPPING_INPUT_FORMAT_CLASS =
+ ClassConfOption.create("giraph.mappingInputFormatClass", null,
+ MappingInputFormat.class, "MappingInputFormat class");
/** EdgeInputFilter class */
ClassConfOption<EdgeInputFilter> EDGE_INPUT_FILTER_CLASS =
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index e9f50f9..3d7b3db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.giraph.conf;
+import com.google.common.base.Preconditions;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.edge.Edge;
@@ -39,12 +40,14 @@ import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.filters.EdgeInputFilter;
import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
+import org.apache.giraph.io.internal.WrappedMappingInputFormat;
import org.apache.giraph.io.internal.WrappedVertexInputFormat;
import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
@@ -53,6 +56,9 @@ import org.apache.giraph.io.superstep_output.SuperstepOutput;
import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.job.GiraphJobRetryChecker;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingStoreOps;
+import org.apache.giraph.mapping.translate.TranslateEdge;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.master.SuperstepClasses;
@@ -65,6 +71,7 @@ import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.utils.io.BigDataInputOutput;
import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.giraph.utils.io.ExtendedDataInputOutput;
@@ -91,6 +98,8 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
V extends Writable, E extends Writable> extends GiraphConfiguration {
/** Holder for all the classes */
private final GiraphClasses classes;
+ /** Mapping target class */
+ private Class<? extends Writable> mappingTargetClass = null;
/** Value (IVEMM) Factories */
private final ValueFactories<I, V, E> valueFactories;
/** Language values (IVEMM) are implemented in */
@@ -148,6 +157,27 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Get the class used for edge translation during vertex input
+ *
+ * @return edge translation class
+ */
+ public Class<? extends TranslateEdge> edgeTranslationClass() {
+ return EDGE_TRANSLATION_CLASS.get(this);
+ }
+
+ /**
+ * Instance of TranslateEdge that contains helper method for edge translation
+ *
+ * @return instance of TranslateEdge
+ */
+ public TranslateEdge<I, E> edgeTranslationInstance() {
+ if (edgeTranslationClass() != null) {
+ return ReflectionUtils.newInstance(edgeTranslationClass(), this);
+ }
+ return null;
+ }
+
+ /**
* Get the vertex input filter class
*
* @return VertexInputFilter class
@@ -274,6 +304,25 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Get MappingInputFormatClass
+ *
+ * @return MappingInputFormatClass
+ */
+ public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
+ getMappingInputFormatClass() {
+ return classes.getMappingInputFormatClass();
+ }
+
+ /**
+ * Check if mappingInputFormat is set
+ *
+ * @return true if mappingInputFormat is set
+ */
+ public boolean hasMappingInputFormat() {
+ return classes.hasMappingInputFormat();
+ }
+
+ /**
* Create a user vertex output format class.
* Note: Giraph should only use WrappedVertexOutputFormat,
* which makes sure that Configuration parameters are set properly.
@@ -287,6 +336,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Create a user mapping input format class.
+ * Note: Giraph should only use WrappedMappingInputFormat,
+ * which makes sure that Configuration parameters are set properly.
+ *
+ * @return Instantiated user mapping input format class
+ */
+ private MappingInputFormat<I, V, E, ? extends Writable>
+ createMappingInputFormat() {
+ Class<? extends MappingInputFormat<I, V, E, ? extends Writable>> klass =
+ getMappingInputFormatClass();
+ return ReflectionUtils.newInstance(klass, this);
+ }
+
+ /**
* Create a wrapper for user vertex output format,
* which makes sure that Configuration parameters are set properly in all
* methods related to this format.
@@ -300,6 +363,22 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
return wrappedVertexOutputFormat;
}
+ /**
+ * Create a wrapper for user mapping input format,
+ * which makes sure that Configuration parameters are set properly in all
+ * methods related to this format.
+ *
+ * @return Wrapper around user mapping input format
+ */
+ public WrappedMappingInputFormat<I, V, E, ? extends Writable>
+ createWrappedMappingInputFormat() {
+ WrappedMappingInputFormat<I, V, E, ? extends Writable>
+ wrappedMappingInputFormat =
+ new WrappedMappingInputFormat<>(createMappingInputFormat());
+ configureIfPossible(wrappedMappingInputFormat);
+ return wrappedMappingInputFormat;
+ }
+
@Override
public boolean hasEdgeOutputFormat() {
return classes.hasEdgeOutputFormat();
@@ -756,6 +835,24 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Create edge based on #createEdge definition
+ *
+ * @param translateEdge instance of TranslateEdge
+ * @param edge edge to be translated
+ * @return translated edge
+ */
+ public Edge<I, E> createEdge(TranslateEdge<I, E>
+ translateEdge, Edge<I, E> edge) {
+ I translatedId = translateEdge.translateId(edge.getTargetVertexId());
+ if (isEdgeValueNullWritable()) {
+ return (Edge<I, E>) EdgeFactory.create(translatedId);
+ } else {
+ return EdgeFactory.create(translatedId,
+ translateEdge.cloneValue(edge.getValue()));
+ }
+ }
+
+ /**
* Create a reusable edge.
*
* @return Instantiated reusable edge.
@@ -856,6 +953,74 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Get MappingStore class to be used
+ *
+ * @return MappingStore class set by user
+ */
+ public Class<? extends MappingStore> getMappingStoreClass() {
+ return MAPPING_STORE_CLASS.get(this);
+ }
+
+ /**
+ * Create a {@link org.apache.giraph.mapping.MappingStore} instance
+ *
+ * @return MappingStore Instance
+ */
+ public MappingStore<I, ? extends Writable> createMappingStore() {
+ if (getMappingStoreClass() != null) {
+ return ReflectionUtils.newInstance(getMappingStoreClass(), this);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get MappingStoreOps class to be used
+ *
+ * @return MappingStoreOps class set by user
+ */
+ public Class<? extends MappingStoreOps> getMappingStoreOpsClass() {
+ return MAPPING_STORE_OPS_CLASS.get(this);
+ }
+
+ /**
+ * Create a {@link org.apache.giraph.mapping.MappingStoreOps} instance
+ *
+ * @return MappingStoreOps Instance
+ */
+ public MappingStoreOps<I, ? extends Writable> createMappingStoreOps() {
+ if (getMappingStoreOpsClass() != null) {
+ return ReflectionUtils.newInstance(getMappingStoreOpsClass(), this);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get mappingTarget class
+ *
+ * @return mappingTarget class
+ */
+ public Class<? extends Writable> getMappingTargetClass() {
+ if (mappingTargetClass == null) {
+ Class<?>[] classList = ReflectionUtils.getTypeArguments(
+ MappingStore.class, getMappingStoreClass());
+ Preconditions.checkArgument(classList.length == 2);
+ mappingTargetClass = (Class<? extends Writable>) classList[1];
+ }
+ return mappingTargetClass;
+ }
+
+ /**
+ * Create and return mappingTarget instance
+ *
+ * @return mappingTarget instance
+ */
+ public Writable createMappingTarget() {
+ return WritableUtils.createWritable(getMappingTargetClass());
+ }
+
+ /**
* Create a user {@link org.apache.giraph.edge.OutEdges}
*
* @return Instantiated user OutEdges
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java
new file mode 100644
index 0000000..2666268
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ *
+ * Use this to load data for a BSP application. Note that the InputSplit must
+ * also implement Writable.
+ *
+ * It's guaranteed that whatever parameters are set in the configuration are
+ * also going to be available in all method arguments related to this input
+ * format (context in getSplits and createVertexReader; methods invoked on
+ * MappingReader). So if backing input format relies on some parameters from
+ * configuration, you can safely set them for example in
+ * {@link #setConf(org.apache.giraph.conf.ImmutableClassesGiraphConfiguration)}.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public abstract class MappingInputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable, B extends Writable>
+ extends GiraphInputFormat<I, V, E> {
+
+ /**
+ * Create a vertex reader for a given split. Guaranteed to have been
+ * configured with setConf() prior to use. The framework will also call
+ * {@link VertexReader#initialize(InputSplit,
+ * org.apache.hadoop.mapreduce.TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param split the split to be read
+ * @param context the information about the task
+ * @return a new record reader
+ * @throws IOException
+ */
+ public abstract MappingReader<I, V, E, B> createMappingReader(
+ InputSplit split, TaskAttemptContext context) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
new file mode 100644
index 0000000..b7ce97c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Will read the mapping from an input split.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public abstract class MappingReader<I extends WritableComparable,
+ V extends Writable, E extends Writable, B extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements WorkerAggregatorUsage {
+
+ /** Aggregator usage for vertex reader */
+ private WorkerAggregatorUsage workerAggregatorUsage;
+
+ /**
+ * Use the input split and context to setup reading the vertices.
+ * Guaranteed to be called prior to any other function.
+ *
+ * @param inputSplit Input split to be used for reading vertices.
+ * @param context Context from the task.
+ * @throws java.io.IOException
+ * @throws InterruptedException
+ */
+ public abstract void initialize(InputSplit inputSplit,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException;
+
+
+ /**
+ * Set aggregator usage. It provides the functionality
+ * of aggregation operation in reading a vertex.
+ * It is invoked just after initialization.
+ * E.g.,
+ * vertexReader.initialize(inputSplit, context);
+ * vertexReader.setAggregator(aggregatorUsage);
+ * This method is only for use by the infrastructure.
+ *
+ * @param agg aggregator usage for vertex reader
+ */
+ public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+ workerAggregatorUsage = agg;
+ }
+
+ /**
+ *
+ * @return false iff there are no more vertices
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract boolean nextEntry() throws IOException,
+ InterruptedException;
+
+
+ /**
+ * Get the current entry.
+ *
+ * @return the current entry which has been read.
+ * nextVEntry() should be called first.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract MappingEntry<I, B> getCurrentEntry()
+ throws IOException, InterruptedException;
+
+
+ /**
+ * Close this {@link MappingReader} to future operations.
+ *
+ * @throws IOException
+ */
+ public abstract void close() throws IOException;
+
+ /**
+ * How much of the input has the {@link VertexReader} consumed i.e.
+ * has been processed by?
+ *
+ * @return Progress from <code>0.0</code> to <code>1.0</code>.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract float getProgress() throws IOException, InterruptedException;
+
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ workerAggregatorUsage.aggregate(name, value);
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return workerAggregatorUsage.getAggregatedValue(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java
new file mode 100644
index 0000000..72f8177
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.job.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * For internal use only.
+ *
+ * Wraps user set {@link org.apache.giraph.io.VertexInputFormat} to make
+ * sure proper configuration parameters are passed around, that user can set
+ * parameters in configuration and they will be available in other methods
+ * related to this format.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class WrappedMappingInputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable, B extends Writable>
+ extends MappingInputFormat<I, V, E, B> {
+ /** originalInputFormat to wrap over */
+ private MappingInputFormat<I, V, E, B> originalInputFormat;
+
+ /**
+ * Constructor
+ *
+ * @param mappingInputFormat original mappingInputFormat
+ */
+ public WrappedMappingInputFormat(
+ MappingInputFormat<I, V, E, B> mappingInputFormat) {
+ originalInputFormat = mappingInputFormat;
+ }
+
+ @Override
+ public void checkInputSpecs(Configuration conf) {
+ originalInputFormat.checkInputSpecs(conf);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+ throws IOException, InterruptedException {
+ return originalInputFormat.getSplits(
+ HadoopUtils.makeJobContext(getConf(), context),
+ minSplitCountHint);
+ }
+
+ @Override
+ public MappingReader<I, V, E, B> createMappingReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ final MappingReader<I, V, E, B> mappingReader = originalInputFormat
+ .createMappingReader(split,
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+ return new WrappedMappingReader<>(mappingReader, getConf());
+ }
+
+
+ @Override
+ public void writeInputSplit(InputSplit inputSplit,
+ DataOutput dataOutput) throws IOException {
+ originalInputFormat.writeInputSplit(inputSplit, dataOutput);
+ }
+
+ @Override
+ public InputSplit readInputSplit(
+ DataInput dataInput) throws IOException, ClassNotFoundException {
+ return originalInputFormat.readInputSplit(dataInput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
new file mode 100644
index 0000000..7d1c4c9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.job.HadoopUtils;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * For internal use only.
+ *
+ * Wraps {@link org.apache.giraph.io.MappingReader} to make sure proper
+ * configuration parameters are passed around, that parameters set in original
+ * configuration are available in methods of this reader
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class WrappedMappingReader<I extends WritableComparable,
+ V extends Writable, E extends Writable, B extends Writable>
+ extends MappingReader<I, V, E, B> {
+ /** User set baseMappingReader wrapped over */
+ private final MappingReader<I, V, E, B> baseMappingReader;
+
+ /**
+ * Constructor
+ *
+ * @param baseMappingReader User set baseMappingReader
+ * @param conf configuration
+ */
+ public WrappedMappingReader(MappingReader<I, V, E, B> baseMappingReader,
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ this.baseMappingReader = baseMappingReader;
+ super.setConf(conf);
+ baseMappingReader.setConf(conf);
+ }
+
+ @Override
+ public void setConf(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ // We don't want to use external configuration
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ baseMappingReader.initialize(inputSplit,
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+ }
+
+
+ @Override
+ public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+ // Set aggregator usage for vertex reader
+ baseMappingReader.setWorkerAggregatorUse(agg);
+ }
+
+ @Override
+ public boolean nextEntry() throws IOException, InterruptedException {
+ return baseMappingReader.nextEntry();
+ }
+
+ @Override
+ public MappingEntry<I, B> getCurrentEntry()
+ throws IOException, InterruptedException {
+ return baseMappingReader.getCurrentEntry();
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ baseMappingReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return baseMappingReader.getProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java
new file mode 100644
index 0000000..d8b9c31
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Wraps {@link GiraphReader} for mapping into
+ * {@link org.apache.giraph.io.MappingReader}
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class MappingReaderWrapper<I extends WritableComparable,
+ V extends Writable, E extends Writable, B extends Writable>
+ extends MappingReader<I, V, E, B> {
+ /** Wrapped mapping reader */
+ private GiraphReader<MappingEntry<I, B>> mappingReader;
+ /**
+ * {@link org.apache.giraph.io.MappingReader}-like wrapper of
+ * {@link #mappingReader}
+ */
+ private IteratorToReaderWrapper<MappingEntry<I, B>> iterator;
+
+ /**
+ * Constructor
+ *
+ * @param mappingReader user supplied mappingReader
+ */
+ public MappingReaderWrapper(GiraphReader<MappingEntry<I, B>> mappingReader) {
+ this.mappingReader = mappingReader;
+ iterator = new IteratorToReaderWrapper<>(mappingReader);
+ }
+
+ @Override
+ public void setConf(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ super.setConf(conf);
+ conf.configureIfPossible(mappingReader);
+ }
+
+ @Override
+ public boolean nextEntry() throws IOException, InterruptedException {
+ return iterator.nextObject();
+ }
+
+ @Override
+ public MappingEntry<I, B> getCurrentEntry()
+ throws IOException, InterruptedException {
+ return iterator.getCurrentObject();
+ }
+
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ mappingReader.initialize(inputSplit, context);
+ }
+
+ @Override
+ public void close() throws IOException {
+ mappingReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return mappingReader.getProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java
new file mode 100644
index 0000000..2e2310b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Implementation of basic methods in MappingStoreOps
+ */
+@SuppressWarnings("unchecked, rawtypes")
+public abstract class AbstractLongByteOps
+ implements MappingStoreOps<LongWritable, ByteWritable> {
+ /** Mapping store instance to operate on */
+ protected LongByteMappingStore mappingStore;
+
+ @Override
+ public void initialize(MappingStore<LongWritable,
+ ByteWritable> mappingStore) {
+ this.mappingStore = (LongByteMappingStore) mappingStore;
+ }
+
+ /**
+ * Compute partition given id, partitionCount, workerCount & target
+ * @param id vertex id
+ * @param partitionCount number of partitions
+ * @param workerCount number of workers
+ * @param target target worker
+ * @return partition number
+ */
+ protected int computePartition(LongWritable id, int partitionCount,
+ int workerCount, byte target) {
+ int numRows = partitionCount / workerCount;
+ numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
+ if (target == -1) {
+ // default to hash based partitioning
+ return Math.abs(id.hashCode() % partitionCount);
+ } else {
+ int targetWorker = target & 0xFF;
+ // assume zero based indexing of partition & worker [also consecutive]
+ return numRows * targetWorker + Math.abs(id.hashCode() % numRows);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java
new file mode 100644
index 0000000..0b6f3e4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MappingStoreOps implementation used to embed target information into
+ * vertex id. Stores information in the higher order bits of the long id
+ */
+public class DefaultEmbeddedLongByteOps extends AbstractLongByteOps {
+ /** Bit mask for first 9 bits in a long */
+ private static final long MASK = ((long) 0x1FF) << 55;
+ /** Inverse of MASK */
+ private static final long IMASK = ~ MASK;
+
+ /**
+ * Default constructor (do not use)
+ */
+ public DefaultEmbeddedLongByteOps() {
+ }
+
+ @Override
+ public boolean hasEmbedding() {
+ return true;
+ }
+
+ @Override
+ public void embedTargetInfo(LongWritable id) {
+ if ((id.get() & MASK) != 0) {
+ throw new IllegalStateException("Expected first 9 bits of long " +
+ " to be empty");
+ }
+ byte target = mappingStore.getByteTarget(id);
+ // first bit = 0 & rest 8 bits set to target
+ // add 1 to distinguish between not set and assignment to worker-0
+ // (prefix bits = 0 can mean one of two things :
+ // no entry in the mapping, in which case target = -1, so -1 + 1 = 0
+ // vertex is created later during computation, so prefix bits are 0 anyway)
+ long maskValue = ((1L + target) & 0xFF) << 55;
+ id.set(id.get() | maskValue);
+ }
+
+ @Override
+ public void removeTargetInfo(LongWritable id) {
+ id.set(id.get() & IMASK);
+ }
+
+ @Override
+ public int getPartition(LongWritable id, int partitionCount,
+ int workerCount) {
+ // extract last 8 bits
+ // subtract 1 since added 1 during embedInfo (unset = -1)
+ byte target = (byte) (((id.get() >>> 55) & 0xFF) - 1);
+ return computePartition(id, partitionCount, workerCount, target);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java
new file mode 100644
index 0000000..57ece94
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MappingStoreOps implementation which reads partition information from map
+ */
+@SuppressWarnings("unchecked, rawtypes")
+public class DefaultLongByteOps extends AbstractLongByteOps {
+
+ /**
+ * Default constructor (do not use)
+ */
+ public DefaultLongByteOps() {
+ }
+
+ @Override
+ public boolean hasEmbedding() {
+ return false;
+ }
+
+ @Override
+ public void embedTargetInfo(LongWritable id) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeTargetInfo(LongWritable id) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getPartition(LongWritable id, int partitionCount,
+ int workerCount) {
+ byte target = mappingStore.getByteTarget(id);
+ return computePartition(id, partitionCount, workerCount, target);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java b/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java
new file mode 100644
index 0000000..996fed0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import com.google.common.collect.MapMaker;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ *
+ * An implementation of MappingStore<LongWritable, ByteWritable>
+ *
+ * Methods implemented here are thread safe by default because it is guaranteed
+ * that each entry is written to only once.
+ * It can represent up to a maximum of 254 workers
+ * any byte passed is treated as unsigned
+ */
+@ThreadSafe
+public class LongByteMappingStore
+ extends DefaultImmutableClassesGiraphConfigurable<LongWritable, Writable,
+ Writable> implements MappingStore<LongWritable, ByteWritable> {
+ /** Logger instance */
+ private static final Logger LOG = Logger.getLogger(
+ LongByteMappingStore.class);
+
+ /** Counts number of entries added */
+ private final AtomicLong numEntries = new AtomicLong(0);
+
+ /** Id prefix to bytesArray index mapping */
+ private ConcurrentMap<Long, byte[]> concurrentIdToBytes;
+ /** Primitive idToBytes for faster querying */
+ private Long2ObjectOpenHashMap<byte[]> idToBytes;
+ /** Number of lower order bits */
+ private int lower;
+ /** Number of distinct prefixes */
+ private int upper;
+ /** Bit mask for lowerOrder suffix bits */
+ private int lowerBitMask;
+ /** LowerOrder bits count */
+ private int lowerOrder;
+
+ @Override
+ public void initialize() {
+ upper = GiraphConstants.LB_MAPPINGSTORE_UPPER.get(getConf());
+ lower = GiraphConstants.LB_MAPPINGSTORE_LOWER.get(getConf());
+
+ if ((lower & (lower - 1)) != 0) {
+ throw new IllegalStateException("lower not a power of two");
+ }
+
+ lowerBitMask = lower - 1;
+ lowerOrder = Integer.numberOfTrailingZeros(lower); // log_2_(lower)
+ concurrentIdToBytes = new MapMaker()
+ .initialCapacity(upper)
+ .concurrencyLevel(getConf().getNumInputSplitsThreads())
+ .makeMap();
+ idToBytes = new Long2ObjectOpenHashMap<>(upper);
+ }
+
+ /**
+ * Auxiliary method to be used by getTarget
+ *
+ * @param vertexId vertexId
+ * @return return byte value of target
+ */
+ public byte getByteTarget(LongWritable vertexId) {
+ long key = vertexId.get() >>> lowerOrder;
+ int suffix = (int) (vertexId.get() & lowerBitMask);
+ if (!idToBytes.containsKey(key)) {
+ return -1;
+ }
+ return idToBytes.get(key)[suffix];
+ }
+
+ @Override
+ public void addEntry(LongWritable vertexId, ByteWritable target) {
+ long key = vertexId.get() >>> lowerOrder;
+ byte[] bytes = concurrentIdToBytes.get(key);
+ if (bytes == null) {
+ byte[] newBytes = new byte[lower];
+ Arrays.fill(newBytes, (byte) -1);
+ bytes = concurrentIdToBytes.putIfAbsent(key, newBytes);
+ if (bytes == null) {
+ bytes = newBytes;
+ }
+ }
+ bytes[(int) (vertexId.get() & lowerBitMask)] = target.get();
+ numEntries.getAndIncrement(); // increment count
+ }
+
+ @Override
+ public ByteWritable getTarget(LongWritable vertexId,
+ ByteWritable target) {
+ Byte bval = getByteTarget(vertexId);
+ if (bval == -1) { // worker not assigned by mapping
+ return null;
+ }
+ target.set(bval);
+ return target;
+ }
+
+ @Override
+ public void postFilling() {
+ // not thread-safe
+ for (Long id : concurrentIdToBytes.keySet()) {
+ idToBytes.put(id, concurrentIdToBytes.get(id));
+ }
+ concurrentIdToBytes.clear();
+ concurrentIdToBytes = null;
+ }
+
+ @Override
+ public long getStats() {
+ return numEntries.longValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java
new file mode 100644
index 0000000..8c8efa1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An entry in MappingStore
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public class MappingEntry<I extends WritableComparable, B extends Writable> {
+ /** Vertex Id */
+ private I vertexId;
+ /** Mapping Target */
+ private B mappingTarget;
+
+ /**
+ * Constructor
+ *
+ * @param vertexId vertexId
+ * @param mappingTarget mappingTarget
+ */
+ public MappingEntry(I vertexId, B mappingTarget) {
+ this.vertexId = vertexId;
+ this.mappingTarget = mappingTarget;
+ }
+
+ public I getVertexId() {
+ return vertexId;
+ }
+
+ public B getMappingTarget() {
+ return mappingTarget;
+ }
+
+ public void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ }
+
+ public void setMappingTarget(B mappingTarget) {
+ this.mappingTarget = mappingTarget;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java
new file mode 100644
index 0000000..5b872a4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * MappingStore used to store the vertexId - target map supplied by user
+ * Methods implemented in this class need to be thread safe
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public interface MappingStore<I extends WritableComparable, B extends Writable>
+ extends ImmutableClassesGiraphConfigurable<I, Writable, Writable> {
+
+ /**
+ * Must be called before anything else can be done
+ * on this instance
+ */
+ void initialize();
+
+ /**
+ * Add an entry to the mapping store
+ *
+ * @param vertexId vertexId
+ * @param target target
+ */
+ void addEntry(I vertexId, B target);
+
+ /**
+ * Get target for given vertexId
+ *
+ * @param vertexId vertexId
+ * @param target instance to use for storing target information
+ * @return target instance
+ */
+ B getTarget(I vertexId, B target);
+
+ /**
+ * Operations to perform after adding entries
+ * to the mapping store
+ */
+ void postFilling();
+
+ /**
+ * Get stats about the MappingStore
+ *
+ * @return numEntries
+ */
+ long getStats();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java
new file mode 100644
index 0000000..aba034e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface of operations that can be done on mapping store
+ * once it is fully loaded
+ *
+ * @param <I> vertex id type
+ * @param <B> mapping target type
+ */
+public interface MappingStoreOps<I extends WritableComparable,
+ B extends Writable> {
+
+ /**
+ * Must be called before anything else can be done
+ * on this instance
+ * @param mappingStore mapping store instance to operate on
+ */
+ void initialize(MappingStore<I, B> mappingStore);
+
+ /**
+ * True if MappingStoreOps is based on embedding info
+ *
+ * @return true if worker info is embedded into vertex ids
+ */
+ boolean hasEmbedding();
+
+ /**
+ * Embed target information into vertexId
+ *
+ * @param id vertexId
+ */
+ void embedTargetInfo(I id);
+
+ /**
+ * Remove target information from vertexId
+ *
+ * @param id vertexId
+ */
+ void removeTargetInfo(I id);
+
+
+ /**
+ * Get partition id for a vertex id
+ *
+ * @param id vertexId
+ * @param partitionCount partitionCount
+ * @param workerCount workerCount
+ * @return partition of vertex id
+ */
+ int getPartition(I id, int partitionCount, int workerCount);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java b/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java
new file mode 100644
index 0000000..ae23475
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package contains definition and implementations of MappingStore and
+ * related concepts
+ */
+package org.apache.giraph.mapping;
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java
new file mode 100644
index 0000000..102ef50
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping.translate;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.giraph.worker.LocalData;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Basic implementation of Translate Edge
+ * where I = LongWritable & B = ByteWritable
+ *
+ * @param <E> edge value type
+ */
+@SuppressWarnings("unchecked")
+public class LongByteTranslateEdge<E extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable
+ implements TranslateEdge<LongWritable, E> {
+
+ /** Local data used for targetId translation of edge */
+ private LocalData<LongWritable,
+ ? extends Writable, E, ByteWritable> localData;
+
+ @Override
+ public void initialize(BspServiceWorker<LongWritable,
+ ? extends Writable, E> service) {
+ localData = (LocalData<LongWritable, ? extends Writable, E, ByteWritable>)
+ service.getLocalData();
+ }
+
+ @Override
+ public LongWritable translateId(LongWritable targetId) {
+ LongWritable translatedId = new LongWritable();
+ translatedId.set(targetId.get());
+ localData.getMappingStoreOps().embedTargetInfo(translatedId);
+ return translatedId;
+ }
+
+ @Override
+ public E cloneValue(E edgeValue) {
+ // If vertex input does not have create edges,
+ // then you can use LongByteTranslateEdge directly
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Correct implementation of cloneValue when edgevalue = nullwritable
+ */
+ public static class NoEdgeValue
+ extends LongByteTranslateEdge<NullWritable> {
+ @Override
+ public NullWritable cloneValue(NullWritable edgeValue) {
+ return NullWritable.get();
+ }
+ }
+
+ /**
+ * Correct implementation of cloneValue when edgevalue = intwritable
+ */
+ public static class IntEdgeValue
+ extends LongByteTranslateEdge<IntWritable> {
+ @Override
+ public IntWritable cloneValue(IntWritable edgeValue) {
+ return new IntWritable(edgeValue.get());
+ }
+ }
+
+ /**
+ * Correct implementation of cloneValue when edgevalue = longwritable
+ */
+ public static class LongEdgeValue
+ extends LongByteTranslateEdge<LongWritable> {
+ @Override
+ public LongWritable cloneValue(LongWritable edgeValue) {
+ return new LongWritable(edgeValue.get());
+ }
+ }
+
+ /**
+ * Correct implementation of cloneValue when edgevalue = floatwritable
+ */
+ public static class FloatEdgeValue
+ extends LongByteTranslateEdge<FloatWritable> {
+ @Override
+ public FloatWritable cloneValue(FloatWritable edgeValue) {
+ return new FloatWritable(edgeValue.get());
+ }
+ }
+
+ /**
+ * Correct implementation of cloneValue when edgevalue = doublewritable
+ */
+ public static class DoubleEdgeValue
+ extends LongByteTranslateEdge<DoubleWritable> {
+ @Override
+ public DoubleWritable cloneValue(DoubleWritable edgeValue) {
+ return new DoubleWritable(edgeValue.get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java
new file mode 100644
index 0000000..85e8768
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping.translate;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Used to conduct expensive translation of edges
+ * during vertex input phase
+ *
+ * @param <I> vertexId type
+ * @param <E> edgeValue type
+ */
+public interface TranslateEdge<I extends WritableComparable, E extends Writable>
+ extends ImmutableClassesGiraphConfigurable {
+ /**
+ * Must be called before other methods can be used
+ *
+ * @param service bsp service worker
+ */
+ void initialize(BspServiceWorker<I, ? extends Writable, E> service);
+
+ /**
+ * Translate Id & return a new instance
+ *
+ * @param targetId edge target Id
+ * @return a new translated Id instance
+ */
+ I translateId(I targetId);
+
+ /**
+ * Clone edge value
+ *
+ * @param edgeValue edge value
+ * @return clone of edge value
+ */
+ E cloneValue(E edgeValue);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java
new file mode 100644
index 0000000..8536e83
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Definitions & sample implementations of edge translation logic
+ */
+package org.apache.giraph.mapping.translate;
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 90dc9f3..e367b94 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -40,6 +40,7 @@ import org.apache.giraph.graph.GraphState;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.partition.MasterGraphPartitioner;
import org.apache.giraph.partition.PartitionOwner;
@@ -119,7 +120,7 @@ import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
* @param <V> Vertex data
* @param <E> Edge data
*/
-@SuppressWarnings("rawtypes")
+@SuppressWarnings("rawtypes, unchecked")
public class BspServiceMaster<I extends WritableComparable,
V extends Writable, E extends Writable>
extends BspService<I, V, E>
@@ -217,7 +218,7 @@ public class BspServiceMaster<I extends WritableComparable,
observers = conf.createMasterObservers();
GiraphMetrics.get().addSuperstepResetObserver(this);
- GiraphStats.init(context);
+ GiraphStats.init((Mapper.Context) context);
}
@Override
@@ -676,6 +677,17 @@ public class BspServiceMaster<I extends WritableComparable,
}
@Override
+ public int createMappingInputSplits() {
+ if (!getConfiguration().hasMappingInputFormat()) {
+ return 0;
+ }
+ MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
+ getConfiguration().createWrappedMappingInputFormat();
+ return createInputSplits(mappingInputFormat, mappingInputSplitsPaths,
+ "Mapping");
+ }
+
+ @Override
public int createVertexInputSplits() {
// Short-circuit if there is no vertex input format
if (!getConfiguration().hasVertexInputFormat()) {
@@ -1589,8 +1601,12 @@ public class BspServiceMaster<I extends WritableComparable,
if (getSuperstep() == INPUT_SUPERSTEP) {
// Initialize aggregators before coordinating
- // vertex loading and edge loading
initializeAggregatorInputSuperstep();
+ if (getConfiguration().hasMappingInputFormat()) {
+ coordinateInputSplits(mappingInputSplitsPaths, mappingInputSplitsEvents,
+ "Mapping");
+ }
+ // vertex loading and edge loading
if (getConfiguration().hasVertexInputFormat()) {
coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
"Vertex");
[2/3] GIRAPH-908: support for partitioned input in giraph (pavanka)
Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index 15dbe07..0635210 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -107,7 +107,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
// Attempt to create InputSplits if necessary. Bail out if that fails.
if (bspServiceMaster.getRestartedSuperstep() !=
BspService.UNSET_SUPERSTEP ||
- (bspServiceMaster.createVertexInputSplits() != -1 &&
+ (bspServiceMaster.createMappingInputSplits() != -1 &&
+ bspServiceMaster.createVertexInputSplits() != -1 &&
bspServiceMaster.createEdgeInputSplits() != -1)) {
long setupMillis = System.currentTimeMillis() - initializeMillis;
GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
@@ -123,7 +124,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
superstepState = bspServiceMaster.coordinateSuperstep();
long superstepMillis = System.currentTimeMillis() -
startSuperstepMillis;
- superstepSecsMap.put(Long.valueOf(cachedSuperstep),
+ superstepSecsMap.put(cachedSuperstep,
superstepMillis / 1000.0d);
if (LOG.isInfoEnabled()) {
LOG.info("masterThread: Coordination of superstep " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
index 4200d79..c5e2f3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
@@ -19,6 +19,7 @@
package org.apache.giraph.partition;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -32,7 +33,14 @@ import org.apache.hadoop.io.WritableComparable;
@SuppressWarnings("rawtypes")
public interface GraphPartitionerFactory<I extends WritableComparable,
V extends Writable, E extends Writable> extends
- ImmutableClassesGiraphConfigurable {
+ ImmutableClassesGiraphConfigurable<I, V, E> {
+
+ /**
+ * Use some local data present in the worker
+ *
+ * @param localData localData present in the worker
+ */
+ void initialize(LocalData<I, V, E, ? extends Writable> localData);
/**
* Create the {@link MasterGraphPartitioner} used by the master.
* Instantiated once by the master and reused.
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
index 7cc5651..221e50d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
@@ -18,7 +18,8 @@
package org.apache.giraph.partition;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -32,10 +33,13 @@ import org.apache.hadoop.io.WritableComparable;
*/
@SuppressWarnings("rawtypes")
public class HashPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable>
- implements GraphPartitionerFactory<I, V, E> {
- /** Saved configuration */
- private ImmutableClassesGiraphConfiguration conf;
+ V extends Writable, E extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements GraphPartitionerFactory<I, V, E> {
+
+ @Override
+ public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+ }
@Override
public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
@@ -46,14 +50,4 @@ public class HashPartitionerFactory<I extends WritableComparable,
public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
return new HashWorkerPartitioner<I, V, E>();
}
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
index 1eeece7..5f7ee40 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
@@ -18,7 +18,8 @@
package org.apache.giraph.partition;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -32,10 +33,13 @@ import org.apache.hadoop.io.WritableComparable;
*/
@SuppressWarnings("rawtypes")
public class HashRangePartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable>
- implements GraphPartitionerFactory<I, V, E> {
- /** Saved configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E> conf;
+ V extends Writable, E extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements GraphPartitionerFactory<I, V, E> {
+
+ @Override
+ public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+ }
@Override
public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
@@ -46,14 +50,4 @@ public class HashRangePartitionerFactory<I extends WritableComparable,
public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
return new HashRangeWorkerPartitioner<I, V, E>();
}
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
new file mode 100644
index 0000000..e129050
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.worker.LocalData;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * Factory for long-byte mapping based partitioners.
+ *
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ */
+@SuppressWarnings("unchecked")
+public class LongMappingStorePartitionerFactory<V extends Writable,
+ E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
+ /** Logger Instance */
+ private static final Logger LOG = Logger.getLogger(
+ LongMappingStorePartitionerFactory.class);
+ /** Local Data that supplies the mapping store */
+ protected LocalData<LongWritable, V, E, ? extends Writable> localData = null;
+
+ @Override
+ public void initialize(LocalData<LongWritable, V, E,
+ ? extends Writable> localData) {
+ this.localData = localData;
+ LOG.info("Initializing LongMappingStorePartitionerFactory with localData");
+ }
+
+ @Override
+ protected int getPartition(LongWritable id, int partitionCount,
+ int workerCount) {
+ return localData.getMappingStoreOps().getPartition(id,
+ partitionCount, workerCount);
+ }
+
+ @Override
+ protected int getWorker(int partition, int partitionCount, int workerCount) {
+ int numRows = partitionCount / workerCount;
+ numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
+ return partition / numRows;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
index 8ab692f..5dd580b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
@@ -31,14 +31,26 @@ import org.apache.hadoop.io.Writable;
* @param <V> Vertex value type
* @param <E> Edge value type
*/
-public class SimpleIntRangePartitionerFactory
- <V extends Writable, E extends Writable>
- extends SimplePartitionerFactory<IntWritable, V, E> {
+public class SimpleIntRangePartitionerFactory<V extends Writable,
+ E extends Writable> extends SimplePartitionerFactory<IntWritable, V, E> {
/** Vertex key space size. */
private int keySpaceSize;
@Override
+ protected int getPartition(IntWritable id, int partitionCount,
+ int workerCount) {
+ return getPartition(id, partitionCount);
+ }
+
+ /**
+ * Calculates in which partition current vertex belongs to,
+ * from interval [0, partitionCount).
+ *
+ * @param id Vertex id
+ * @param partitionCount Number of partitions
+ * @return partition
+ */
protected int getPartition(IntWritable id, int partitionCount) {
return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
index 2989598..e637e16 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
@@ -31,14 +31,26 @@ import org.apache.hadoop.io.Writable;
* @param <V> Vertex value type
* @param <E> Edge value type
*/
-public class SimpleLongRangePartitionerFactory
- <V extends Writable, E extends Writable>
- extends SimplePartitionerFactory<LongWritable, V, E> {
+public class SimpleLongRangePartitionerFactory<V extends Writable,
+ E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
/** Vertex key space size. */
private long keySpaceSize;
@Override
+ protected int getPartition(LongWritable id, int partitionCount,
+ int workerCount) {
+ return getPartition(id, partitionCount);
+ }
+
+ /**
+ * Calculates in which partition current vertex belongs to,
+ * from interval [0, partitionCount).
+ *
+ * @param id Vertex id
+ * @param partitionCount Number of partitions
+ * @return partition
+ */
protected int getPartition(LongWritable id, int partitionCount) {
return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
index 15b0756..1e29846 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
@@ -18,7 +18,8 @@
package org.apache.giraph.partition;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -33,14 +34,17 @@ import org.apache.hadoop.io.WritableComparable;
* @param <E> Edge value
*/
public abstract class SimplePartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable>
- implements GraphPartitionerFactory<I, V, E> {
- /** Configuration. */
- private ImmutableClassesGiraphConfiguration conf;
+ V extends Writable, E extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements GraphPartitionerFactory<I, V, E> {
+
+ @Override
+ public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+ }
@Override
public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
- return new SimpleMasterPartitioner<I, V, E>(conf) {
+ return new SimpleMasterPartitioner<I, V, E>(getConf()) {
@Override
protected int getWorkerIndex(int partition, int partitionCount,
int workerCount) {
@@ -54,31 +58,26 @@ public abstract class SimplePartitionerFactory<I extends WritableComparable,
public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
return new SimpleWorkerPartitioner<I, V, E>() {
@Override
- protected int getPartitionIndex(I id, int partitionCount) {
- return SimplePartitionerFactory.this.getPartition(id, partitionCount);
+ protected int getPartitionIndex(I id, int partitionCount,
+ int workerCount) {
+ return SimplePartitionerFactory.this.getPartition(id,
+ partitionCount, workerCount);
}
};
}
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-
- @Override
- public final ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
-
/**
* Calculates in which partition current vertex belongs to,
* from interval [0, partitionCount).
*
* @param id Vertex id
* @param partitionCount Number of partitions
+ * @param workerCount Number of workers
* @return partition
*/
- protected abstract int getPartition(I id, int partitionCount);
+ protected abstract int getPartition(I id, int partitionCount,
+ int workerCount);
+
/**
* Calculates worker that should be responsible for passed partition.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
index 600d7a3..3c0de44 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
@@ -19,13 +19,16 @@
package org.apache.giraph.partition;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
/**
* Abstracts and implements all WorkerGraphPartitioner logic on top of a single
@@ -38,8 +41,13 @@ import com.google.common.collect.Lists;
public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
V extends Writable, E extends Writable>
implements WorkerGraphPartitioner<I, V, E> {
+ /** Logger instance */
+ private static final Logger LOG = Logger.getLogger(
+ SimpleWorkerPartitioner.class);
/** List of {@link PartitionOwner}s for this worker. */
private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
+ /** List of available workers */
+ private Set<WorkerInfo> availableWorkers = new HashSet<>();
@Override
public PartitionOwner createPartitionOwner() {
@@ -49,7 +57,8 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
@Override
public PartitionOwner getPartitionOwner(I vertexId) {
return partitionOwnerList.get(
- getPartitionIndex(vertexId, partitionOwnerList.size()));
+ getPartitionIndex(vertexId, partitionOwnerList.size(),
+ availableWorkers.size()));
}
@Override
@@ -64,8 +73,11 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
Collection<? extends PartitionOwner> masterSetPartitionOwners,
PartitionStore<I, V, E> partitionStore) {
- return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
- myWorkerInfo, masterSetPartitionOwners, partitionStore);
+ PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
+ partitionOwnerList, myWorkerInfo, masterSetPartitionOwners,
+ partitionStore);
+ extractAvailableWorkers();
+ return exchange;
}
@Override
@@ -74,12 +86,26 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
}
/**
+ * Update availableWorkers
+ */
+ public void extractAvailableWorkers() {
+ availableWorkers.clear();
+ for (PartitionOwner partitionOwner : partitionOwnerList) {
+ availableWorkers.add(partitionOwner.getWorkerInfo());
+ }
+ LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
+ " workers are available");
+ }
+
+ /**
* Calculates in which partition current vertex belongs to,
* from interval [0, partitionCount).
*
* @param id Vertex id
* @param partitionCount Number of partitions
+ * @param workerCount Number of active workers
* @return partition
*/
- protected abstract int getPartitionIndex(I id, int partitionCount);
+ protected abstract int getPartitionIndex(I id, int partitionCount,
+ int workerCount);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index aff7084..104932c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -46,6 +46,7 @@ import org.apache.giraph.io.EdgeWriter;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.VertexWriter;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
+import org.apache.giraph.mapping.translate.TranslateEdge;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.master.SuperstepClasses;
import org.apache.giraph.metrics.GiraphMetrics;
@@ -133,7 +134,10 @@ public class BspServiceWorker<I extends WritableComparable,
private final WorkerInfo workerInfo;
/** Worker graph partitioner */
private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
-
+ /** Local Data for each worker */
+ private final LocalData<I, V, E, ? extends Writable> localData;
+ /** Used to translate Edges during vertex input phase based on localData */
+ private final TranslateEdge<I, E> translateEdge;
/** IPC Client */
private final WorkerClient<I, V, E> workerClient;
/** IPC Server */
@@ -182,6 +186,11 @@ public class BspServiceWorker<I extends WritableComparable,
throws IOException, InterruptedException {
super(context, graphTaskManager);
ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
+ localData = new LocalData<>(conf);
+ translateEdge = getConfiguration().edgeTranslationInstance();
+ if (translateEdge != null) {
+ translateEdge.initialize(this);
+ }
partitionExchangeChildrenChanged = new PredicateLock(context);
registerBspEvent(partitionExchangeChildrenChanged);
workerGraphPartitioner =
@@ -237,6 +246,14 @@ public class BspServiceWorker<I extends WritableComparable,
return workerClient;
}
+ public LocalData<I, V, E, ? extends Writable> getLocalData() {
+ return localData;
+ }
+
+ public TranslateEdge<I, E> getTranslateEdge() {
+ return translateEdge;
+ }
+
/**
* Intended to check the health of the node. For instance, can it ssh,
* dmesg, etc. For now, does nothing.
@@ -293,6 +310,55 @@ public class BspServiceWorker<I extends WritableComparable,
return vertexEdgeCount;
}
+ /**
+ * Load the mapping entries from the user-defined
+ * {@link org.apache.giraph.io.MappingReader}
+ *
+ * @return Count of mapping entries loaded
+ */
+ private Integer loadMapping() throws KeeperException,
+ InterruptedException {
+ List<String> inputSplitPathList =
+ getZkExt().getChildrenExt(mappingInputSplitsPaths.getPath(),
+ false, false, true);
+
+ InputSplitPathOrganizer splitOrganizer =
+ new InputSplitPathOrganizer(getZkExt(),
+ inputSplitPathList, getWorkerInfo().getHostname(),
+ getConfiguration().useInputSplitLocality());
+
+ MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
+ mappingInputSplitsCallableFactory =
+ new MappingInputSplitsCallableFactory<>(
+ getConfiguration().createWrappedMappingInputFormat(),
+ splitOrganizer,
+ getContext(),
+ getConfiguration(),
+ this,
+ getZkExt());
+
+ int entriesLoaded = 0;
+ // Determine how many threads to use based on the number of input splits
+ int maxInputSplitThreads = inputSplitPathList.size();
+ int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+ maxInputSplitThreads);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
+ "originally " + getConfiguration().getNumInputSplitsThreads() +
+ " threads(s) for " + inputSplitPathList.size() + " total splits.");
+ }
+
+ List<Integer> results =
+ ProgressableUtils.getResultsWithNCallables(
+ mappingInputSplitsCallableFactory,
+ numThreads, "load-mapping-%d", getContext());
+ for (Integer result : results) {
+ entriesLoaded += result;
+ }
+ // after all threads finish loading - call postFilling
+ localData.getMappingStore().postFilling();
+ return entriesLoaded;
+ }
/**
* Load the vertices from the user-defined
@@ -403,13 +469,15 @@ public class BspServiceWorker<I extends WritableComparable,
}
/**
- * Wait for all workers to finish processing input splits.
+ * Mark current worker as done and then wait for all workers
+ * to finish processing input splits.
*
* @param inputSplitPaths Input split paths
* @param inputSplitEvents Input split events
*/
- private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents) {
+ private void markCurrentWorkerDoneThenWaitForOthers(
+ InputSplitPaths inputSplitPaths,
+ InputSplitEvents inputSplitEvents) {
String workerInputSplitsDonePath =
inputSplitPaths.getDonePath() + "/" +
getWorkerInfo().getHostnameId();
@@ -420,10 +488,12 @@ public class BspServiceWorker<I extends WritableComparable,
CreateMode.PERSISTENT,
true);
} catch (KeeperException e) {
- throw new IllegalStateException("waitForOtherWorkers: " +
+ throw new IllegalStateException(
+ "markCurrentWorkerDoneThenWaitForOthers: " +
"KeeperException creating worker done splits", e);
} catch (InterruptedException e) {
- throw new IllegalStateException("waitForOtherWorkers: " +
+ throw new IllegalStateException(
+ "markCurrentWorkerDoneThenWaitForOthers: " +
"InterruptedException creating worker done splits", e);
}
while (true) {
@@ -433,10 +503,12 @@ public class BspServiceWorker<I extends WritableComparable,
getZkExt().exists(inputSplitPaths.getAllDonePath(),
true);
} catch (KeeperException e) {
- throw new IllegalStateException("waitForOtherWorkers: " +
+ throw new IllegalStateException(
+ "markCurrentWorkerDoneThenWaitForOthers: " +
"KeeperException waiting on worker done splits", e);
} catch (InterruptedException e) {
- throw new IllegalStateException("waitForOtherWorkers: " +
+ throw new IllegalStateException(
+ "markCurrentWorkerDoneThenWaitForOthers: " +
"InterruptedException waiting on worker done splits", e);
}
if (inputSplitsDoneStat != null) {
@@ -501,6 +573,37 @@ public class BspServiceWorker<I extends WritableComparable,
aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
VertexEdgeCount vertexEdgeCount;
+ int entriesLoaded = 0;
+
+ if (getConfiguration().hasMappingInputFormat()) {
+ // Ensure the mapping InputSplits are ready for processing
+ ensureInputSplitsReady(mappingInputSplitsPaths, mappingInputSplitsEvents);
+ getContext().progress();
+ try {
+ entriesLoaded = loadMapping();
+ // successfully loaded mapping
+ // now initialize graphPartitionerFactory with this data
+ getGraphPartitionerFactory().initialize(localData);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setup: loadMapping failed with InterruptedException", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setup: loadMapping failed with KeeperException", e);
+ }
+ getContext().progress();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Finally loaded a total of " +
+ entriesLoaded + " entries from inputSplits");
+ }
+
+ // Workers wait for each other to finish, coordinated by master
+ markCurrentWorkerDoneThenWaitForOthers(mappingInputSplitsPaths,
+ mappingInputSplitsEvents);
+ // Print stats for data stored in localData once mapping is fully
+ // loaded on all the workers
+ localData.printStats();
+ }
if (getConfiguration().hasVertexInputFormat()) {
// Ensure the vertex InputSplits are ready for processing
@@ -544,12 +647,14 @@ public class BspServiceWorker<I extends WritableComparable,
if (getConfiguration().hasVertexInputFormat()) {
// Workers wait for each other to finish, coordinated by master
- waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
+ markCurrentWorkerDoneThenWaitForOthers(vertexInputSplitsPaths,
+ vertexInputSplitsEvents);
}
if (getConfiguration().hasEdgeInputFormat()) {
// Workers wait for each other to finish, coordinated by master
- waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
+ markCurrentWorkerDoneThenWaitForOthers(edgeInputSplitsPaths,
+ edgeInputSplitsEvents);
}
// Create remaining partitions owned by this worker.
@@ -569,6 +674,8 @@ public class BspServiceWorker<I extends WritableComparable,
getServerData().getEdgeStore().moveEdgesToVertices();
}
+ localData.removeMappingStoreIfPossible();
+
// Generate the partition stats for the input superstep and process
// if necessary
List<PartitionStats> partitionStatsList =
@@ -726,6 +833,17 @@ public class BspServiceWorker<I extends WritableComparable,
getGraphTaskManager().getGraphFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("startSuperstep: addressesAndPartitions" +
+ addressesAndPartitions.getWorkerInfos());
+ for (PartitionOwner partitionOwner : addressesAndPartitions
+ .getPartitionOwners()) {
+ LOG.debug(partitionOwner.getPartitionId() + " " +
+ partitionOwner.getWorkerInfo());
+ }
+ }
+
return addressesAndPartitions.getPartitionOwners();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 828eac4..35ad94b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -48,6 +48,7 @@ import java.io.IOException;
* @param <V> Vertex value
* @param <E> Edge value
*/
+@SuppressWarnings("unchecked")
public class EdgeInputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable>
extends InputSplitsCallable<I, V, E> {
@@ -62,10 +63,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
/** Aggregator handler */
private final WorkerThreadAggregatorUsage aggregatorUsage;
+ /** Bsp service worker (only use thread-safe methods) */
+ private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Edge input format */
private final EdgeInputFormat<I, E> edgeInputFormat;
/** Input split max edges (-1 denotes all) */
private final long inputSplitMaxEdges;
+ /** Can embedInfo in vertexIds */
+ private final boolean canEmbedInIds;
/** Filter to use */
private final EdgeInputFilter<I, E> edgeInputFilter;
@@ -97,11 +102,19 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
zooKeeperExt);
this.edgeInputFormat = edgeInputFormat;
+ this.bspServiceWorker = bspServiceWorker;
inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
// Initialize aggregator usage.
this.aggregatorUsage = bspServiceWorker.getAggregatorHandler()
.newThreadAggregatorUsage();
edgeInputFilter = configuration.getEdgeInputFilter();
+ canEmbedInIds = bspServiceWorker
+ .getLocalData()
+ .getMappingStoreOps() != null &&
+ bspServiceWorker
+ .getLocalData()
+ .getMappingStoreOps()
+ .hasEmbedding();
// Initialize Metrics
totalEdgesMeter = getTotalEdgesLoadedMeter();
@@ -157,6 +170,16 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
"readInputSplit: Edge reader returned an edge " +
"without a value! - " + readerEdge);
}
+ if (canEmbedInIds) {
+ bspServiceWorker
+ .getLocalData()
+ .getMappingStoreOps()
+ .embedTargetInfo(sourceId);
+ bspServiceWorker
+ .getLocalData()
+ .getMappingStoreOps()
+ .embedTargetInfo(readerEdge.getTargetVertexId());
+ }
++inputSplitEdgesLoaded;
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
new file mode 100644
index 0000000..4e93ce0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
+import org.apache.giraph.time.Times;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * InputSplitCallable to read all the splits
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ */
+public abstract class FullInputSplitCallable<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ implements Callable<Integer> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(
+ FullInputSplitCallable.class);
+ /** Class time object */
+ private static final Time TIME = SystemTime.get();
+ /** Configuration */
+ protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+ /** Context */
+ protected final Mapper<?, ?, ?, ?>.Context context;
+
+ /** The List of InputSplit znode paths */
+ private final List<String> pathList;
+ /** Current position in the path list */
+ private final AtomicInteger currentIndex;
+ /** ZooKeeperExt handle */
+ private final ZooKeeperExt zooKeeperExt;
+ /** Get the start time in nanos */
+ private final long startNanos = TIME.getNanoseconds();
+
+ // CHECKSTYLE: stop ParameterNumberCheck
+ /**
+ * Constructor.
+
+ * @param splitOrganizer Input splits organizer
+ * @param context Context
+ * @param configuration Configuration
+ * @param zooKeeperExt Handle to ZooKeeperExt
+ * @param currentIndex Atomic Integer to get splitPath from list
+ */
+ public FullInputSplitCallable(InputSplitPathOrganizer splitOrganizer,
+ Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ ZooKeeperExt zooKeeperExt,
+ AtomicInteger currentIndex) {
+ this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
+ this.currentIndex = currentIndex;
+ this.zooKeeperExt = zooKeeperExt;
+ this.context = context;
+ this.configuration = configuration;
+ }
+ // CHECKSTYLE: resume ParameterNumberCheck
+
+ /**
+ * Get input format
+ *
+ * @return Input format
+ */
+ public abstract GiraphInputFormat getInputFormat();
+
+ /**
+ * Load mapping entries from all the given input splits
+ *
+ * @param inputSplit Input split to load
+ * @return Count of vertices and edges loaded
+ * @throws java.io.IOException
+ * @throws InterruptedException
+ */
+ protected abstract Integer readInputSplit(InputSplit inputSplit)
+ throws IOException, InterruptedException;
+
+ @Override
+ public Integer call() {
+ int entries = 0;
+ String inputSplitPath;
+ int inputSplitsProcessed = 0;
+ try {
+ while (true) {
+ int pos = currentIndex.getAndIncrement();
+ if (pos >= pathList.size()) {
+ break;
+ }
+ inputSplitPath = pathList.get(pos);
+ entries += loadInputSplit(inputSplitPath);
+ context.progress();
+ ++inputSplitsProcessed;
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("call: InterruptedException", e);
+ } catch (IOException e) {
+ throw new IllegalStateException("call: IOException", e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("call: ClassNotFoundException", e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("call: InstantiationException", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("call: IllegalAccessException", e);
+ }
+
+ if (LOG.isInfoEnabled()) {
+ float seconds = Times.getNanosSince(TIME, startNanos) /
+ Time.NS_PER_SECOND_AS_FLOAT;
+ float entriesPerSecond = entries / seconds;
+ LOG.info("call: Loaded " + inputSplitsProcessed + " " +
+ "input splits in " + seconds + " secs, " + entries +
+ " " + entriesPerSecond + " entries/sec");
+ }
+ return entries;
+ }
+
+ /**
+ * Extract entries from input split, saving them into mapping store.
+ * Mark the input split finished when done.
+ *
+ * @param inputSplitPath ZK location of input split
+ * @return Number of entries read in this input split
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ */
+ private Integer loadInputSplit(
+ String inputSplitPath)
+ throws IOException, ClassNotFoundException, InterruptedException,
+ InstantiationException, IllegalAccessException {
+ InputSplit inputSplit = getInputSplit(inputSplitPath);
+ Integer entriesRead = readInputSplit(inputSplit);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadFromInputSplit: Finished loading " +
+ inputSplitPath + " " + entriesRead);
+ }
+ return entriesRead;
+ }
+
+ /**
+ * Talk to ZooKeeper to convert the input split path to the actual
+ * InputSplit.
+ *
+ * @param inputSplitPath Location in ZK of input split
+ * @return instance of InputSplit
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ protected InputSplit getInputSplit(String inputSplitPath)
+ throws IOException, ClassNotFoundException {
+ byte[] splitList;
+ try {
+ splitList = zooKeeperExt.getData(inputSplitPath, false, null);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "getInputSplit: KeeperException on " + inputSplitPath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "getInputSplit: IllegalStateException on " + inputSplitPath, e);
+ }
+ context.progress();
+
+ DataInputStream inputStream =
+ new DataInputStream(new ByteArrayInputStream(splitList));
+ InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getInputSplit: Processing " + inputSplitPath +
+ " from ZooKeeper and got input split '" +
+ inputSplit.toString() + "'");
+ }
+ return inputSplit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java b/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java
new file mode 100644
index 0000000..9612344
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingStoreOps;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Stores LocalData for each worker
+ *
+ * @param <I> vertexId type
+ * @param <V> vertex value type
+ * @param <E> edge value type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public class LocalData<I extends WritableComparable, V extends Writable,
+ E extends Writable, B extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+ /** Logger Instance */
+ private static final Logger LOG = Logger.getLogger(LocalData.class);
+ /** MappingStore from vertexId - target */
+ private MappingStore<I, B> mappingStore;
+ /** Do operations using mapping store */
+ private MappingStoreOps<I, B> mappingStoreOps;
+ /**
+ * Constructor
+ *
+ * Set configuration, create & initialize mapping store
+ * @param conf giraph configuration
+ */
+ public LocalData(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ // set configuration
+ setConf(conf);
+ // check if user set the mapping store => create & initialize it
+ mappingStore = (MappingStore<I, B>) getConf().createMappingStore();
+ if (mappingStore != null) {
+ mappingStore.initialize();
+ }
+ mappingStoreOps = (MappingStoreOps<I, B>) getConf().createMappingStoreOps();
+ if (mappingStoreOps != null) {
+ mappingStoreOps.initialize(mappingStore);
+ }
+ }
+
+ public MappingStore<I, B> getMappingStore() {
+ return mappingStore;
+ }
+
+ public MappingStoreOps<I, B> getMappingStoreOps() {
+ return mappingStoreOps;
+ }
+
+ /**
+ * Remove mapping store from localData
+ * if mapping data is already embedded into vertexIds
+ */
+ public void removeMappingStoreIfPossible() {
+ if (mappingStoreOps != null && mappingStoreOps.hasEmbedding()) {
+ mappingStore = null;
+ }
+ }
+
+ /**
+ * Prints Stats of individual data it stores
+ */
+ public void printStats() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("MappingStore has : " + mappingStore.getStats() + " entries");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
new file mode 100644
index 0000000..a2279a9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Load as many mapping input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public class MappingInputSplitsCallable<I extends WritableComparable,
+ V extends Writable, E extends Writable, B extends Writable>
+ extends FullInputSplitCallable<I, V, E> {
+ /** User supplied mappingInputFormat */
+ private final MappingInputFormat<I, V, E, B> mappingInputFormat;
+ /** Link to bspServiceWorker */
+ private final BspServiceWorker<I, V, E> bspServiceWorker;
+
+ /**
+ * Constructor
+ *
+ * @param mappingInputFormat mappingInputFormat
+ * @param splitOrganizer Input splits organizer
+ * @param context Context
+ * @param configuration Configuration
+ * @param zooKeeperExt Handle to ZooKeeperExt
+ * @param currentIndex Atomic Integer to get splitPath from list
+ * @param bspServiceWorker bsp service worker
+ */
+ public MappingInputSplitsCallable(
+ MappingInputFormat<I, V, E, B> mappingInputFormat,
+ InputSplitPathOrganizer splitOrganizer,
+ Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ ZooKeeperExt zooKeeperExt,
+ AtomicInteger currentIndex,
+ BspServiceWorker<I, V, E> bspServiceWorker) {
+ super(splitOrganizer, context,
+ configuration, zooKeeperExt, currentIndex);
+ this.mappingInputFormat = mappingInputFormat;
+ this.bspServiceWorker = bspServiceWorker;
+ }
+
+ @Override
+ public GiraphInputFormat getInputFormat() {
+ return mappingInputFormat;
+ }
+
+ @Override
+ protected Integer readInputSplit(InputSplit inputSplit)
+ throws IOException, InterruptedException {
+ MappingReader<I, V, E, B> mappingReader =
+ mappingInputFormat.createMappingReader(inputSplit, context);
+ mappingReader.setConf(configuration);
+
+ WorkerThreadAggregatorUsage aggregatorUsage = this.bspServiceWorker
+ .getAggregatorHandler().newThreadAggregatorUsage();
+
+ mappingReader.initialize(inputSplit, context);
+ mappingReader.setWorkerAggregatorUse(aggregatorUsage);
+
+ int entriesLoaded = 0;
+ MappingStore<I, B> mappingStore =
+ (MappingStore<I, B>) bspServiceWorker.getLocalData().getMappingStore();
+
+ while (mappingReader.nextEntry()) {
+ MappingEntry<I, B> entry = mappingReader.getCurrentEntry();
+ entriesLoaded += 1;
+ mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
+ }
+ return entriesLoaded;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
new file mode 100644
index 0000000..21a981e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Factory for {@link org.apache.giraph.worker.MappingInputSplitsCallable}s.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class MappingInputSplitsCallableFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, B extends Writable>
+ implements CallableFactory<Integer> {
+ /** Mapping input format */
+ private final MappingInputFormat<I, V, E, B> mappingInputFormat;
+ /** Input split organizer */
+ private final InputSplitPathOrganizer splitOrganizer;
+ /** Mapper context. */
+ private final Mapper<?, ?, ?, ?>.Context context;
+ /** Configuration. */
+ private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+ /** {@link BspServiceWorker} we're running on. */
+ private final BspServiceWorker<I, V, E> bspServiceWorker;
+ /** {@link ZooKeeperExt} for this worker. */
+ private final ZooKeeperExt zooKeeperExt;
+ /** Current position in the path list */
+ private final AtomicInteger currentIndex;
+
+
+ /**
+ * Constructor.
+ *
+ * @param mappingInputFormat Mapping input format
+ * @param splitOrganizer Input split organizer
+ * @param context Mapper context
+ * @param configuration Configuration
+ * @param bspServiceWorker Calling {@link BspServiceWorker}
+ * @param zooKeeperExt {@link org.apache.giraph.zk.ZooKeeperExt}
+ * for this worker
+ */
+ public MappingInputSplitsCallableFactory(
+ MappingInputFormat<I, V, E, B> mappingInputFormat,
+ InputSplitPathOrganizer splitOrganizer,
+ Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ BspServiceWorker<I, V, E> bspServiceWorker,
+ ZooKeeperExt zooKeeperExt) {
+ this.mappingInputFormat = mappingInputFormat;
+ this.splitOrganizer = splitOrganizer;
+ this.context = context;
+ this.configuration = configuration;
+ this.bspServiceWorker = bspServiceWorker;
+ this.zooKeeperExt = zooKeeperExt;
+ this.currentIndex = new AtomicInteger(0);
+ }
+
+ @Override
+ public FullInputSplitCallable<I, V, E> newCallable(int threadId) {
+ return new MappingInputSplitsCallable<>(
+ mappingInputFormat,
+ splitOrganizer,
+ context,
+ configuration,
+ zooKeeperExt,
+ currentIndex,
+ bspServiceWorker);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index e3e04d6..4c85765 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -19,12 +19,15 @@
package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.filters.VertexInputFilter;
+import org.apache.giraph.mapping.translate.TranslateEdge;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
@@ -50,6 +53,7 @@ import java.io.IOException;
* @param <V> Vertex value
* @param <E> Edge value
*/
+@SuppressWarnings("unchecked")
public class VertexInputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable>
extends InputSplitsCallable<I, V, E> {
@@ -69,6 +73,15 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Filter to select which vertices to keep */
private final VertexInputFilter<I, V, E> vertexInputFilter;
+ /** Can embedInfo in vertexIds */
+ private final boolean canEmbedInIds;
+ /**
+ * Whether the chosen {@link OutEdges} implementation allows for Edge
+ * reuse.
+ */
+ private boolean reuseEdgeObjects;
+ /** Used to translate Edges during vertex input phase based on localData */
+ private final TranslateEdge<I, E> translateEdge;
// Metrics
/** number of vertices loaded meter across all readers */
@@ -102,6 +115,15 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
this.bspServiceWorker = bspServiceWorker;
vertexInputFilter = configuration.getVertexInputFilter();
+ reuseEdgeObjects = configuration.reuseEdgeObjects();
+ canEmbedInIds = bspServiceWorker
+ .getLocalData()
+ .getMappingStoreOps() != null &&
+ bspServiceWorker
+ .getLocalData()
+ .getMappingStoreOps()
+ .hasEmbedding();
+ translateEdge = bspServiceWorker.getTranslateEdge();
// Initialize Metrics
totalVerticesMeter = getTotalVerticesLoadedMeter();
@@ -151,6 +173,12 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
"readInputSplit: Vertex reader returned a vertex " +
"without an id! - " + readerVertex);
}
+ if (canEmbedInIds) {
+ bspServiceWorker
+ .getLocalData()
+ .getMappingStoreOps()
+ .embedTargetInfo(readerVertex.getId());
+ }
if (readerVertex.getValue() == null) {
readerVertex.setValue(configuration.createVertexValue());
}
@@ -167,6 +195,37 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
continue;
}
+ // Before saving to partition-store translate all edges (if present)
+ if (translateEdge != null) {
+ // only iff vertexInput reads edges also
+ if (readerVertex.getEdges() != null && readerVertex.getNumEdges() > 0) {
+ OutEdges<I, E> vertexOutEdges = configuration
+ .createAndInitializeOutEdges(readerVertex.getNumEdges());
+ // TODO : this works for generic OutEdges, can create a better api
+ // to support more efficient translation for specific types
+
+ // NOTE : for implementations where edge is reusable, space is
+ // consumed by the OutEdges data structure itself, but if not reusable
+ // space is consumed by the newly created edge -> and the new OutEdges
+ // data structure just holds a reference to the newly created edge
+ // so in any way we virtually hold edges twice - similar to
+ // OutEdges.trim() -> this has the same complexity as OutEdges.trim()
+ for (Edge<I, E> edge : readerVertex.getEdges()) {
+ if (reuseEdgeObjects) {
+ bspServiceWorker
+ .getLocalData()
+ .getMappingStoreOps()
+ .embedTargetInfo(edge.getTargetVertexId());
+ vertexOutEdges.add(edge); // edge can be re-used
+ } else { // edge objects cannot be reused - so create new edges
+ vertexOutEdges.add(configuration.createEdge(translateEdge, edge));
+ }
+ }
+ // set out edges to translated instance -> old instance is released
+ readerVertex.setEdges(vertexOutEdges);
+ }
+ }
+
PartitionOwner partitionOwner =
bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
workerClientRequestProcessor.sendVertexRequest(
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
index 4e19cd2..96bd5d7 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.giraph.partition;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
@@ -47,6 +48,7 @@ public class SimpleRangePartitionFactoryTest {
ArrayList<WorkerInfo> infos = new ArrayList<WorkerInfo>();
for (int i = 0; i < numWorkers; i++) {
WorkerInfo info = new WorkerInfo();
+ info.setInetSocketAddress(new InetSocketAddress(8080));
info.setTaskId(i);
infos.add(info);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 603910b..fbc24f8 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -31,6 +31,8 @@ import org.apache.giraph.graph.Computation;
import org.apache.giraph.hive.common.HiveUtils;
import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.mapping.HiveMappingInputFormat;
+import org.apache.giraph.hive.input.mapping.HiveToMapping;
import org.apache.giraph.hive.input.vertex.HiveToVertex;
import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
import org.apache.giraph.hive.output.HiveVertexOutputFormat;
@@ -55,6 +57,7 @@ import java.util.Arrays;
import java.util.List;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_MAPPING_INPUT;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
@@ -87,6 +90,8 @@ public class HiveGiraphRunner implements Tool {
private List<EdgeInputFormatDescription> edgeInputDescriptions =
Lists.newArrayList();
+ /** Hive Mapping reader */
+ private Class<? extends HiveToMapping> hiveToMappingClass;
/** Hive Vertex writer */
private Class<? extends VertexToHive> vertexToHiveClass;
/** Skip output? (Useful for testing without writing) */
@@ -238,6 +243,36 @@ public class HiveGiraphRunner implements Tool {
}
/**
+ * Check if mapping input is set
+ *
+ * @return true if mapping input is set
+ */
+ public boolean hasMappingInput() {
+ return hiveToMappingClass != null;
+ }
+
+ /**
+ * Set mapping input
+ *
+ * @param hiveToMappingClass class for reading mapping entries from Hive.
+ * @param tableName Table name
+ * @param partitionFilter Partition filter, or null if no filter used
+ */
+ public void setMappingInput(
+ Class<? extends HiveToMapping> hiveToMappingClass, String tableName,
+ String partitionFilter) {
+ this.hiveToMappingClass = hiveToMappingClass;
+ conf.set(HIVE_MAPPING_INPUT.getClassOpt().getKey(),
+ hiveToMappingClass.getName());
+ conf.set(HIVE_MAPPING_INPUT.getProfileIdOpt().getKey(),
+ "mapping_input_profile");
+ conf.set(HIVE_MAPPING_INPUT.getTableOpt().getKey(), tableName);
+ if (partitionFilter != null) {
+ conf.set(HIVE_MAPPING_INPUT.getPartitionOpt().getKey(), partitionFilter);
+ }
+ }
+
+ /**
* main method
* @param args system arguments
* @throws Exception any errors from Hive Giraph Runner
@@ -425,6 +460,22 @@ public class HiveGiraphRunner implements Tool {
}
/**
+ * Prepare input settings in Configuration
+ *
+ * This caches metadata information into the configuration to eliminate worker
+ * access to the metastore.
+ */
+ public void prepareHiveMappingInput() {
+ GiraphConstants.MAPPING_INPUT_FORMAT_CLASS.set(conf,
+ HiveMappingInputFormat.class);
+
+ Configuration confCopy = new Configuration(conf);
+ createGiraphConf(confCopy)
+ .createWrappedMappingInputFormat()
+ .checkInputSpecs(confCopy);
+ }
+
+ /**
* process arguments
* @param args to process
* @return CommandLine instance
@@ -458,6 +509,17 @@ public class HiveGiraphRunner implements Tool {
" class name (-computationClass) to use");
}
+ String mappingInput = cmdln.getOptionValue("mappingInput");
+ if (mappingInput != null) {
+ String[] parameters = split(mappingInput, ",", 3);
+ if (parameters.length < 2) {
+ throw new IllegalStateException("Illegal mappingInput description " +
+ mappingInput + " - HiveToMapping class and table name needed");
+ }
+ setMappingInput(findClass(parameters[0], HiveToMapping.class),
+ parameters[1], elementOrNull(parameters, 2));
+ }
+
String[] vertexInputs = cmdln.getOptionValues("vertexInput");
if (vertexInputs != null && vertexInputs.length != 0) {
vertexInputDescriptions.clear();
@@ -534,6 +596,11 @@ public class HiveGiraphRunner implements Tool {
// allow metastore changes (i.e. creating tables that don't exist)
processMoreArguments(cmdln);
+ if (mappingInput != null) { // mapping input is provided
+ HIVE_MAPPING_INPUT.getDatabaseOpt().set(conf, dbName);
+ prepareHiveMappingInput();
+ }
+
if (hasVertexInput()) {
HIVE_VERTEX_INPUT.getDatabaseOpt().set(conf, dbName);
prepareHiveVertexInputs();
@@ -573,6 +640,12 @@ public class HiveGiraphRunner implements Tool {
options.addOption("db", "dbName", true, "Hive database name");
+ // Mapping input settings
+ options.addOption("mi", "mappingInput", true, "Giraph " +
+ HiveToMapping.class.getSimpleName() + " class to use, table name and " +
+ "partition filter (optional). Example:\n" +
+ "\"MyHiveToMapping, myTableName, a=1,b=two");
+
// Vertex input settings
options.addOption("vi", "vertexInput", true, getInputOptionDescription(
"vertex", HiveToVertex.class.getSimpleName()));
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
index c7ad63b..ab533a2 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
@@ -20,6 +20,7 @@ package org.apache.giraph.hive.common;
import org.apache.giraph.conf.ClassConfOption;
import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.hive.input.mapping.HiveToMapping;
import org.apache.giraph.hive.input.edge.HiveToEdge;
import org.apache.giraph.hive.input.vertex.HiveToVertex;
import org.apache.giraph.hive.output.VertexToHive;
@@ -28,6 +29,9 @@ import org.apache.giraph.hive.output.VertexToHive;
* Constants for giraph-hive
*/
public class GiraphHiveConstants {
+ /** Options for configuring mapping input */
+ public static final HiveInputOptions<HiveToMapping> HIVE_MAPPING_INPUT =
+ new HiveInputOptions<>("mapping", HiveToMapping.class);
/** Options for configuring vertex input */
public static final HiveInputOptions<HiveToVertex> HIVE_VERTEX_INPUT =
new HiveInputOptions<HiveToVertex>("vertex", HiveToVertex.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
index 2388673..35d8b3e 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
@@ -20,6 +20,7 @@ package org.apache.giraph.hive.common;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.hive.input.mapping.HiveToMapping;
import org.apache.giraph.hive.input.edge.HiveToEdge;
import org.apache.giraph.hive.input.vertex.HiveToVertex;
import org.apache.giraph.hive.output.VertexToHive;
@@ -46,12 +47,14 @@ import java.util.Map;
import static java.lang.System.getenv;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_MAPPING_INPUT;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
/**
* Utility methods for Hive IO
*/
+@SuppressWarnings("unchecked")
public class HiveUtils {
/** Logger */
private static final Logger LOG = Logger.getLogger(HiveUtils.class);
@@ -342,6 +345,31 @@ public class HiveUtils {
}
/**
+ * Create a new HiveToMapping
+ *
+ * @param conf ImmutableClassesGiraphConfiguration
+ * @param schema HiveTableSchema
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ * @return HiveToMapping
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, B extends Writable>
+ HiveToMapping<I, B> newHiveToMapping(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
+ HiveTableSchema schema) {
+ Class<? extends HiveToMapping> klass = HIVE_MAPPING_INPUT.getClass(conf);
+ if (klass == null) {
+ throw new IllegalArgumentException(
+ HIVE_MAPPING_INPUT.getClassOpt().getKey() + " not set in conf"
+ );
+ }
+ return newInstance(klass, conf, schema);
+ }
+
+ /**
* Create a new instance of a class, configuring it and setting the Hive table
* schema if it supports those types.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java
new file mode 100644
index 0000000..dc7a6ee
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * AbstractHiveToMapping
+ *
+ * @param <I> vertexId type parameter
+ * @param <B> mapping target type parameter
+ */
+public abstract class AbstractHiveToMapping<I extends WritableComparable,
+ B extends Writable>
+ extends DefaultConfigurableAndTableSchemaAware<I, Writable, Writable>
+ implements HiveToMapping<I, B> {
+ @Override
+ public final void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java
new file mode 100644
index 0000000..973813d
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.input.HiveApiInputFormat;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.common.GiraphHiveConstants;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.io.iterables.MappingReaderWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.giraph.hive.common.HiveUtils.newHiveToMapping;
+
+/**
+ * HiveMappingInputFormat extends MappingInputFormat
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class HiveMappingInputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable, B extends Writable>
+ extends MappingInputFormat<I, V, E, B> {
+ /** Underlying Hive InputFormat used */
+ private final HiveApiInputFormat hiveInputFormat;
+
+ /**
+ * Create vertex input format
+ */
+ public HiveMappingInputFormat() {
+ hiveInputFormat = new HiveApiInputFormat();
+ }
+
+ @Override
+ public void checkInputSpecs(Configuration conf) {
+ HiveInputDescription inputDesc =
+ GiraphHiveConstants.HIVE_MAPPING_INPUT.makeInputDescription(conf);
+ HiveTableSchema schema = getTableSchema();
+ HiveToMapping<I, B> hiveToMapping = newHiveToMapping(getConf(), schema);
+ hiveToMapping.checkInput(inputDesc, schema);
+ }
+
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ super.setConf(conf);
+ hiveInputFormat.initialize(
+ GiraphHiveConstants.HIVE_MAPPING_INPUT.makeInputDescription(conf),
+ GiraphHiveConstants.HIVE_MAPPING_INPUT.getProfileID(conf),
+ conf);
+ }
+
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+ throws IOException, InterruptedException {
+ return hiveInputFormat.getSplits(context);
+ }
+
+ @Override
+ public MappingReader<I, V, E, B> createMappingReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ HiveMappingReader<I, B> reader = new HiveMappingReader<>();
+ reader.setTableSchema(getTableSchema());
+
+ RecordReader<WritableComparable, HiveReadableRecord> baseReader;
+ try {
+ baseReader = hiveInputFormat.createRecordReader(split, context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not create map reader", e);
+ }
+
+ reader.setHiveRecordReader(baseReader);
+ return new MappingReaderWrapper<>(reader);
+ }
+
+
+ /**
+ * Get Hive table schema
+ *
+ * @return Hive table schema
+ */
+ private HiveTableSchema getTableSchema() {
+ return hiveInputFormat.getTableSchema(getConf());
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java
new file mode 100644
index 0000000..3154f9d
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.giraph.hive.common.HiveUtils;
+import org.apache.giraph.hive.input.RecordReaderWrapper;
+import org.apache.giraph.io.iterables.GiraphReader;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * MappingReader using Hive
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public class HiveMappingReader<I extends WritableComparable,
+ B extends Writable>
+ extends DefaultConfigurableAndTableSchemaAware<I, Writable, Writable>
+ implements GiraphReader<MappingEntry<I, B>> {
+ /** Underlying Hive RecordReader used */
+ private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader;
+ /** Hive To Mapping */
+ private HiveToMapping<I, B> hiveToMapping;
+
+ /**
+ * Get hiverecord reader
+ *
+ * @return hiveRecordReader
+ */
+ public RecordReader<WritableComparable, HiveReadableRecord>
+ getHiveRecordReader() {
+ return hiveRecordReader;
+ }
+
+ public void setHiveRecordReader(
+ RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) {
+ this.hiveRecordReader = hiveRecordReader;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ hiveRecordReader.initialize(inputSplit, context);
+ hiveToMapping = HiveUtils.newHiveToMapping(getConf(), getTableSchema());
+ hiveToMapping.initializeRecords(
+ new RecordReaderWrapper<>(hiveRecordReader));
+ }
+
+ @Override
+ public void close() throws IOException {
+ hiveRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return hiveRecordReader.getProgress();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return hiveToMapping.hasNext();
+ }
+
+
+ @Override
+ public MappingEntry<I, B> next() {
+ return hiveToMapping.next();
+ }
+
+ @Override
+ public void remove() {
+ hiveToMapping.remove();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java
new file mode 100644
index 0000000..497b044
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import org.apache.giraph.hive.input.HiveInputChecker;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ * HiveToMapping interface
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public interface HiveToMapping<I extends WritableComparable,
+ B extends Writable> extends
+ Iterator<MappingEntry<I, B>>, HiveInputChecker {
+ /**
+ * Set the records which contain vertex input data
+ *
+ * @param records Hive records
+ */
+ void initializeRecords(Iterator<HiveReadableRecord> records);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java
new file mode 100644
index 0000000..feccc1f
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ * SimpleHiveToMapping - convenient class for HiveToMapping
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public abstract class SimpleHiveToMapping<I extends WritableComparable,
+ B extends Writable> extends AbstractHiveToMapping<I, B> {
+ /** Hive records which we are reading from */
+ private Iterator<HiveReadableRecord> records;
+
+ /** Reusable entry object */
+ private MappingEntry<I, B> reusableEntry;
+
+ /** Reusable vertex id */
+ private I reusableVertexId;
+ /** Reusable mapping target */
+ private B reusableMappingTarget;
+
+ /**
+ * Read vertexId from hive record
+ *
+ * @param record HiveReadableRecord
+ * @return vertexId
+ */
+ public abstract I getVertexId(HiveReadableRecord record);
+
+ /**
+ * Read mappingTarget from hive record
+ *
+ * @param record HiveReadableRecord
+ * @return mappingTarget
+ */
+ public abstract B getMappingTarget(HiveReadableRecord record);
+
+ @Override
+ public void initializeRecords(Iterator<HiveReadableRecord> records) {
+ this.records = records;
+ reusableVertexId = getConf().createVertexId();
+ reusableMappingTarget = (B) getConf().createMappingTarget();
+ reusableEntry = new MappingEntry<>(reusableVertexId,
+ reusableMappingTarget);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return records.hasNext();
+ }
+
+ @Override
+ public MappingEntry<I, B> next() {
+ HiveReadableRecord record = records.next();
+ I id = getVertexId(record);
+ B target = getMappingTarget(record);
+ reusableEntry.setVertexId(id);
+ reusableEntry.setMappingTarget(target);
+ return reusableEntry;
+ }
+
+ /**
+ * Returns reusableVertexId for use in other methods
+ *
+ * @return reusableVertexId
+ */
+ public I getReusableVertexId() {
+ return reusableVertexId;
+ }
+
+ /**
+ * Returns reusableMappingTarget for use in other methods
+ *
+ * @return reusableMappingTarget
+ */
+ public B getReusableMappingTarget() {
+ return reusableMappingTarget;
+ }
+}