You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/06/08 20:28:49 UTC

[1/3] GIRAPH-908: support for partitioned input in giraph (pavanka)

Repository: giraph
Updated Branches:
  refs/heads/trunk 535a333b7 -> 4a133f576


http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java
new file mode 100644
index 0000000..fc9f9d3
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping.examples;
+
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.input.parser.Records;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import org.apache.giraph.hive.input.mapping.SimpleHiveToMapping;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Long VertexId, Byte MappingTarget implementation of HiveToMapping
+ */
+public class LongByteHiveToMapping extends SimpleHiveToMapping<LongWritable,
+  ByteWritable> {
+
+  @Override
+  public void checkInput(HiveInputDescription inputDesc,
+    HiveTableSchema schema) {
+    Records.verifyType(0, HiveType.LONG, schema);
+    Records.verifyType(1, HiveType.BYTE, schema);
+  }
+
+  @Override
+  public LongWritable getVertexId(HiveReadableRecord record) {
+    LongWritable reusableId = getReusableVertexId();
+    reusableId.set(record.getLong(0));
+    return reusableId;
+  }
+
+  @Override
+  public ByteWritable getMappingTarget(HiveReadableRecord record) {
+    ByteWritable reusableTarget = getReusableMappingTarget();
+    reusableTarget.set(record.getByte(1));
+    return reusableTarget;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java
new file mode 100644
index 0000000..617bc4f
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping.examples;
+
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.input.parser.Records;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import org.apache.giraph.hive.input.mapping.SimpleHiveToMapping;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.util.Iterator;
+
+/**
+ * Long VertexId, Int Mapping target -> Byte MappingTarget
+ * implementation of HiveToMapping
+ *
+ * The input table has long id, int bucket value
+ * we need to translate this to long id & byte bucket value
+ */
+public class LongInt2ByteHiveToMapping extends SimpleHiveToMapping<LongWritable,
+  ByteWritable> {
+
+  /** Number of workers for the job */
+  private int numWorkers = 0;
+
+  @Override
+  public void initializeRecords(Iterator<HiveReadableRecord> records) {
+    super.initializeRecords(records);
+    numWorkers = getConf().getMaxWorkers();
+    if (numWorkers <= 0 || numWorkers >= 255) {
+      throw new IllegalStateException("#workers should be > 0 & < 255");
+    }
+  }
+
+  @Override
+  public void checkInput(HiveInputDescription inputDesc,
+    HiveTableSchema schema) {
+    Records.verifyType(0, HiveType.LONG, schema);
+    Records.verifyType(1, HiveType.INT, schema);
+  }
+
+  @Override
+  public LongWritable getVertexId(HiveReadableRecord record) {
+    long id = record.getLong(0);
+    LongWritable reusableId = getReusableVertexId();
+    reusableId.set(id);
+    return reusableId;
+  }
+
+  @Override
+  public ByteWritable getMappingTarget(HiveReadableRecord record) {
+    int target = record.getInt(1);
+    ByteWritable reusableTarget = getReusableMappingTarget();
+    int bVal = target % numWorkers;
+    if ((bVal >>> 8) != 0) {
+      throw new IllegalStateException("target % numWorkers overflows " +
+          "byte range");
+    }
+    reusableTarget.set((byte) bVal);
+    return reusableTarget;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java
new file mode 100644
index 0000000..41afed6
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Sample implementations of HiveToMapping interface
+ */
+package org.apache.giraph.hive.input.mapping.examples;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java
new file mode 100644
index 0000000..c7ad2a3
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive Mapping input related classes
+ */
+package org.apache.giraph.hive.input.mapping;


[3/3] git commit: updated refs/heads/trunk to 4a133f5

Posted by pa...@apache.org.
GIRAPH-908: support for partitioned input in giraph (pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4a133f57
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4a133f57
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4a133f57

Branch: refs/heads/trunk
Commit: 4a133f5766c09362917e0416af503c0a00b24e87
Parents: 535a333
Author: Pavan Kumar <pa...@fb.com>
Authored: Sun Jun 8 10:36:03 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Sun Jun 8 10:36:03 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../java/org/apache/giraph/bsp/BspService.java  | 208 +++++++++++++-----
 .../giraph/bsp/CentralizedServiceMaster.java    |  10 +
 .../org/apache/giraph/conf/GiraphClasses.java   |  24 ++-
 .../org/apache/giraph/conf/GiraphConstants.java |  32 +++
 .../ImmutableClassesGiraphConfiguration.java    | 165 +++++++++++++++
 .../apache/giraph/io/MappingInputFormat.java    |  64 ++++++
 .../org/apache/giraph/io/MappingReader.java     | 124 +++++++++++
 .../io/internal/WrappedMappingInputFormat.java  |  99 +++++++++
 .../io/internal/WrappedMappingReader.java       | 105 ++++++++++
 .../io/iterables/MappingReaderWrapper.java      |  95 +++++++++
 .../giraph/mapping/AbstractLongByteOps.java     |  60 ++++++
 .../mapping/DefaultEmbeddedLongByteOps.java     |  73 +++++++
 .../giraph/mapping/DefaultLongByteOps.java      |  57 +++++
 .../giraph/mapping/LongByteMappingStore.java    | 143 +++++++++++++
 .../org/apache/giraph/mapping/MappingEntry.java |  62 ++++++
 .../org/apache/giraph/mapping/MappingStore.java |  70 +++++++
 .../apache/giraph/mapping/MappingStoreOps.java  |  72 +++++++
 .../org/apache/giraph/mapping/package-info.java |  23 ++
 .../translate/LongByteTranslateEdge.java        | 123 +++++++++++
 .../giraph/mapping/translate/TranslateEdge.java |  57 +++++
 .../giraph/mapping/translate/package-info.java  |  22 ++
 .../apache/giraph/master/BspServiceMaster.java  |  22 +-
 .../org/apache/giraph/master/MasterThread.java  |   5 +-
 .../partition/GraphPartitionerFactory.java      |  10 +-
 .../partition/HashPartitionerFactory.java       |  24 +--
 .../partition/HashRangePartitionerFactory.java  |  24 +--
 .../LongMappingStorePartitionerFactory.java     |  61 ++++++
 .../SimpleIntRangePartitionerFactory.java       |  18 +-
 .../SimpleLongRangePartitionerFactory.java      |  18 +-
 .../partition/SimplePartitionerFactory.java     |  37 ++--
 .../partition/SimpleWorkerPartitioner.java      |  34 ++-
 .../apache/giraph/worker/BspServiceWorker.java  | 138 +++++++++++-
 .../giraph/worker/EdgeInputSplitsCallable.java  |  23 ++
 .../giraph/worker/FullInputSplitCallable.java   | 210 +++++++++++++++++++
 .../org/apache/giraph/worker/LocalData.java     |  93 ++++++++
 .../worker/MappingInputSplitsCallable.java      | 109 ++++++++++
 .../MappingInputSplitsCallableFactory.java      |  96 +++++++++
 .../worker/VertexInputSplitsCallable.java       |  59 ++++++
 .../SimpleRangePartitionFactoryTest.java        |   2 +
 .../apache/giraph/hive/HiveGiraphRunner.java    |  73 +++++++
 .../giraph/hive/common/GiraphHiveConstants.java |   4 +
 .../apache/giraph/hive/common/HiveUtils.java    |  28 +++
 .../input/mapping/AbstractHiveToMapping.java    |  39 ++++
 .../input/mapping/HiveMappingInputFormat.java   | 116 ++++++++++
 .../hive/input/mapping/HiveMappingReader.java   | 100 +++++++++
 .../hive/input/mapping/HiveToMapping.java       |  44 ++++
 .../hive/input/mapping/SimpleHiveToMapping.java | 105 ++++++++++
 .../mapping/examples/LongByteHiveToMapping.java |  56 +++++
 .../examples/LongInt2ByteHiveToMapping.java     |  81 +++++++
 .../input/mapping/examples/package-info.java    |  22 ++
 .../giraph/hive/input/mapping/package-info.java |  22 ++
 52 files changed, 3239 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index a0e94c1..37b94e2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-908: support for partitioned input in giraph (pavanka)
+
   GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)  
 
   GIRAPH-899: Remove hcatalog from hadoop_facebook profile (pavanka)

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index ec0ddbb..2e35373 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -76,6 +76,25 @@ public abstract class BspService<I extends WritableComparable,
   public static final String BASE_DIR = "/_hadoopBsp";
   /** Master job state znode above base dir */
   public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
+
+  /** Mapping input split directory about base dir */
+  public static final String MAPPING_INPUT_SPLIT_DIR = "/_mappingInputSplitDir";
+  /** Mapping input split done directory about base dir */
+  public static final String MAPPING_INPUT_SPLIT_DONE_DIR =
+      "/_mappingInputSplitDoneDir";
+  /** Denotes a reserved mapping input split */
+  public static final String MAPPING_INPUT_SPLIT_RESERVED_NODE =
+      "/_mappingInputSplitReserved";
+  /** Denotes a finished mapping input split */
+  public static final String MAPPING_INPUT_SPLIT_FINISHED_NODE =
+      "/_mappingInputSplitFinished";
+  /** Denotes that all the mapping input splits are are ready for consumption */
+  public static final String MAPPING_INPUT_SPLITS_ALL_READY_NODE =
+      "/_mappingInputSplitsAllReady";
+  /** Denotes that all the mapping input splits are done. */
+  public static final String MAPPING_INPUT_SPLITS_ALL_DONE_NODE =
+      "/_mappingInputSplitsAllDone";
+
   /** Vertex input split directory about base dir */
   public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir";
   /** Vertex input split done directory about base dir */
@@ -93,6 +112,7 @@ public abstract class BspService<I extends WritableComparable,
   /** Denotes that all the vertex input splits are done. */
   public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE =
       "/_vertexInputSplitsAllDone";
+
   /** Edge input split directory about base dir */
   public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir";
   /** Edge input split done directory about base dir */
@@ -188,10 +208,14 @@ public abstract class BspService<I extends WritableComparable,
   protected final String basePath;
   /** Path to the job state determined by the master (informative only) */
   protected final String masterJobStatePath;
+  /** ZooKeeper paths for mapping input splits. */
+  protected final InputSplitPaths mappingInputSplitsPaths;
   /** ZooKeeper paths for vertex input splits. */
   protected final InputSplitPaths vertexInputSplitsPaths;
   /** ZooKeeper paths for edge input splits. */
   protected final InputSplitPaths edgeInputSplitsPaths;
+  /** Mapping input splits events */
+  protected final InputSplitEvents mappingInputSplitsEvents;
   /** Vertex input split events. */
   protected final InputSplitEvents vertexInputSplitsEvents;
   /** Edge input split events. */
@@ -263,6 +287,7 @@ public abstract class BspService<I extends WritableComparable,
   public BspService(
       Mapper<?, ?, ?, ?>.Context context,
       GraphTaskManager<I, V, E> graphTaskManager) {
+    this.mappingInputSplitsEvents = new InputSplitEvents(context);
     this.vertexInputSplitsEvents = new InputSplitEvents(context);
     this.edgeInputSplitsEvents = new InputSplitEvents(context);
     this.connectedEvent = new PredicateLock(context);
@@ -313,6 +338,10 @@ public abstract class BspService<I extends WritableComparable,
     getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
         basePath);
     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
+    mappingInputSplitsPaths = new InputSplitPaths(basePath,
+        MAPPING_INPUT_SPLIT_DIR, MAPPING_INPUT_SPLIT_DONE_DIR,
+        MAPPING_INPUT_SPLITS_ALL_READY_NODE,
+        MAPPING_INPUT_SPLITS_ALL_DONE_NODE);
     vertexInputSplitsPaths = new InputSplitPaths(basePath,
         VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
         VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE);
@@ -676,8 +705,6 @@ public abstract class BspService<I extends WritableComparable,
    * watches to see if the master commanded job state changes.
    *
    * @return Last job state or null if none
-   * @throws InterruptedException
-   * @throws KeeperException
    */
   public final JSONObject getJobState() {
     try {
@@ -784,8 +811,6 @@ public abstract class BspService<I extends WritableComparable,
    * Get the latest superstep and cache it.
    *
    * @return the latest superstep
-   * @throws InterruptedException
-   * @throws KeeperException
    */
   public final long getSuperstep() {
     if (cachedSuperstep != UNSET_SUPERSTEP) {
@@ -959,7 +984,125 @@ public abstract class BspService<I extends WritableComparable,
       }
       workerHealthRegistrationChanged.signal();
       eventProcessed = true;
+    } else if (processMappingEvent(event) || processVertexEvent(event) ||
+        processEdgeEvent(event)) {
+      return;
+    } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
+        event.getType() == EventType.NodeCreated) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: partitionAssignmentsReadyChanged " +
+            "(partitions are assigned)");
+      }
+      addressesAndPartitionsReadyChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
+        event.getType() == EventType.NodeCreated) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: superstepFinished signaled");
+      }
+      superstepFinished.signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(applicationAttemptsPath) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: applicationAttemptChanged signaled");
+      }
+      applicationAttemptChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: masterElectionChildrenChanged signaled");
+      }
+      masterElectionChildrenChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().equals(cleanedUpPath) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: cleanedUpChildrenChanged signaled");
+      }
+      cleanedUpChildrenChanged.signal();
+      eventProcessed = true;
+    }
+
+    if (!(processEvent(event)) && (!eventProcessed)) {
+      LOG.warn("process: Unknown and unprocessed event (path=" +
+          event.getPath() + ", type=" + event.getType() +
+          ", state=" + event.getState() + ")");
+    }
+  }
+
+  /**
+   * Process WatchedEvent for Mapping Inputsplits
+   *
+   * @param event watched event
+   * @return true if event processed
+   */
+  public final boolean processMappingEvent(WatchedEvent event) {
+    boolean eventProcessed = false;
+    if (event.getPath().equals(
+        mappingInputSplitsPaths.getAllReadyPath()) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: inputSplitsReadyChanged " +
+            "(input splits ready)");
+      }
+      mappingInputSplitsEvents.getAllReadyChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: mappingInputSplitsStateChanged " +
+            "(made a reservation)");
+      }
+      mappingInputSplitsEvents.getStateChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
+        (event.getType() == EventType.NodeDeleted)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: mappingInputSplitsStateChanged " +
+            "(lost a reservation)");
+      }
+      mappingInputSplitsEvents.getStateChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_FINISHED_NODE) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: mappingInputSplitsStateChanged " +
+            "(finished inputsplit)");
+      }
+      mappingInputSplitsEvents.getStateChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_DONE_DIR) &&
+        (event.getType() == EventType.NodeChildrenChanged)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: mappingInputSplitsDoneStateChanged " +
+            "(worker finished sending)");
+      }
+      mappingInputSplitsEvents.getDoneStateChanged().signal();
+      eventProcessed = true;
     } else if (event.getPath().equals(
+        mappingInputSplitsPaths.getAllDonePath()) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: mappingInputSplitsAllDoneChanged " +
+            "(all entries sent from input splits)");
+      }
+      mappingInputSplitsEvents.getAllDoneChanged().signal();
+      eventProcessed = true;
+    }
+    return eventProcessed;
+  }
+
+  /**
+   * Process WatchedEvent for Vertex Inputsplits
+   *
+   * @param event watched event
+   * @return true if event processed
+   */
+  public final boolean processVertexEvent(WatchedEvent event) {
+    boolean eventProcessed = false;
+    if (event.getPath().equals(
         vertexInputSplitsPaths.getAllReadyPath()) &&
         (event.getType() == EventType.NodeCreated)) {
       if (LOG.isInfoEnabled()) {
@@ -1009,7 +1152,19 @@ public abstract class BspService<I extends WritableComparable,
       }
       vertexInputSplitsEvents.getAllDoneChanged().signal();
       eventProcessed = true;
-    } else if (event.getPath().equals(
+    }
+    return eventProcessed;
+  }
+
+  /**
+   * Process WatchedEvent for Edge Inputsplits
+   *
+   * @param event watched event
+   * @return true if event processed
+   */
+  public final boolean processEdgeEvent(WatchedEvent event) {
+    boolean eventProcessed = false;
+    if (event.getPath().equals(
         edgeInputSplitsPaths.getAllReadyPath()) &&
         (event.getType() == EventType.NodeCreated)) {
       if (LOG.isInfoEnabled()) {
@@ -1059,48 +1214,7 @@ public abstract class BspService<I extends WritableComparable,
       }
       edgeInputSplitsEvents.getAllDoneChanged().signal();
       eventProcessed = true;
-    } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
-        event.getType() == EventType.NodeCreated) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: partitionAssignmentsReadyChanged " +
-            "(partitions are assigned)");
-      }
-      addressesAndPartitionsReadyChanged.signal();
-      eventProcessed = true;
-    } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
-        event.getType() == EventType.NodeCreated) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: superstepFinished signaled");
-      }
-      superstepFinished.signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(applicationAttemptsPath) &&
-        event.getType() == EventType.NodeChildrenChanged) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: applicationAttemptChanged signaled");
-      }
-      applicationAttemptChanged.signal();
-      eventProcessed = true;
-    } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
-        event.getType() == EventType.NodeChildrenChanged) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: masterElectionChildrenChanged signaled");
-      }
-      masterElectionChildrenChanged.signal();
-      eventProcessed = true;
-    } else if (event.getPath().equals(cleanedUpPath) &&
-        event.getType() == EventType.NodeChildrenChanged) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: cleanedUpChildrenChanged signaled");
-      }
-      cleanedUpChildrenChanged.signal();
-      eventProcessed = true;
-    }
-
-    if (!(processEvent(event)) && (!eventProcessed)) {
-      LOG.warn("process: Unknown and unprocessed event (path=" +
-          event.getPath() + ", type=" + event.getType() +
-          ", state=" + event.getState() + ")");
     }
+    return eventProcessed;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index bda967d..e5b7cf3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -71,6 +71,16 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
 
   /**
    * Create the {@link BspInputSplit} objects from the index range based on the
+   * user-defined MappingInputFormat.  The {@link BspInputSplit} objects will
+   * processed by the workers later on during the INPUT_SUPERSTEP.
+   *
+   * @return Number of splits. Returns -1 on failure to create
+   *         valid input splits.
+   */
+  int createMappingInputSplits();
+
+  /**
+   * Create the {@link BspInputSplit} objects from the index range based on the
    * user-defined VertexInputFormat.  The {@link BspInputSplit} objects will
    * processed by the workers later on during the INPUT_SUPERSTEP.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 3337621..e7b18aa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -31,6 +31,7 @@ import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -85,6 +86,9 @@ public class GiraphClasses<I extends WritableComparable,
   /** Vertex output format class - cached for fast access */
   protected Class<? extends VertexOutputFormat<I, V, E>>
   vertexOutputFormatClass;
+  /** Mapping input format - cached for fast access */
+  protected Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
+  mappingInputFormatClass;
   /** Edge input format class - cached for fast access */
   protected Class<? extends EdgeInputFormat<I, E>>
   edgeInputFormatClass;
@@ -113,8 +117,7 @@ public class GiraphClasses<I extends WritableComparable,
   /** Edge Input Filter class */
   protected Class<? extends EdgeInputFilter<I, E>> edgeInputFilterClass;
   /** Vertex Input Filter class */
-  protected Class<? extends VertexInputFilter<I, V, E>>
-  vertexInputFilterClass;
+  protected Class<? extends VertexInputFilter<I, V, E>> vertexInputFilterClass;
 
   /**
    * Empty constructor. Initialize with default classes or null.
@@ -181,6 +184,9 @@ public class GiraphClasses<I extends WritableComparable,
         EDGE_INPUT_FORMAT_CLASS.get(conf);
     edgeOutputFormatClass = (Class<? extends EdgeOutputFormat<I, V, E>>)
         EDGE_OUTPUT_FORMAT_CLASS.get(conf);
+    mappingInputFormatClass = (Class<? extends MappingInputFormat<I, V, E,
+        ? extends Writable>>)
+        MAPPING_INPUT_FORMAT_CLASS.get(conf);
 
     aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
     messageCombinerClass =
@@ -335,6 +341,15 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Check if MappingInputFormat is set
+   *
+   * @return true if MappingInputFormat is set
+   */
+  public boolean hasMappingInputFormat() {
+    return mappingInputFormatClass != null;
+  }
+
+  /**
    * Get VertexOutputFormat set
    *
    * @return VertexOutputFormat
@@ -344,6 +359,11 @@ public class GiraphClasses<I extends WritableComparable,
     return vertexOutputFormatClass;
   }
 
+  public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
+  getMappingInputFormatClass() {
+    return mappingInputFormatClass;
+  }
+
   /**
    * Check if EdgeInputFormat is set
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 611a0dc..dd0c9ae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -45,6 +45,7 @@ import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -56,6 +57,9 @@ import org.apache.giraph.job.DefaultJobObserver;
 import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.job.GiraphJobRetryChecker;
 import org.apache.giraph.job.HaltApplicationUtils;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingStoreOps;
+import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
@@ -80,6 +84,30 @@ public interface GiraphConstants {
   /** 1KB in bytes */
   int ONE_KB = 1024;
 
+  /** Mapping related information */
+  ClassConfOption<? extends MappingStore> MAPPING_STORE_CLASS =
+      ClassConfOption.create("giraph.mappingStoreClass", null,
+          MappingStore.class, "MappingStore Class");
+
+  /** Class to use for performing read operations on mapping store */
+  ClassConfOption<? extends MappingStoreOps> MAPPING_STORE_OPS_CLASS =
+      ClassConfOption.create("giraph.mappingStoreOpsClass", null,
+          MappingStoreOps.class, "MappingStoreOps class");
+
+  /** Upper value of LongByteMappingStore */
+  IntConfOption LB_MAPPINGSTORE_UPPER =
+      new IntConfOption("giraph.lbMappingStoreUpper", -1,
+          "'upper' value used by lbmappingstore");
+  /** Lower value of LongByteMappingStore */
+  IntConfOption LB_MAPPINGSTORE_LOWER =
+      new IntConfOption("giraph.lbMappingStoreLower", -1,
+          "'lower' value used by lbMappingstore");
+  /** Class used to conduct expensive edge translation during vertex input */
+  ClassConfOption EDGE_TRANSLATION_CLASS =
+      ClassConfOption.create("giraph.edgeTranslationClass", null,
+          TranslateEdge.class, "Class used to conduct expensive edge " +
+              "translation during vertex input phase");
+
   /** Computation class - required */
   ClassConfOption<Computation> COMPUTATION_CLASS =
       ClassConfOption.create("giraph.computationClass", null,
@@ -230,6 +258,10 @@ public interface GiraphConstants {
   ClassConfOption<EdgeInputFormat> EDGE_INPUT_FORMAT_CLASS =
       ClassConfOption.create("giraph.edgeInputFormatClass", null,
           EdgeInputFormat.class, "EdgeInputFormat class");
+  /** MappingInputFormat class */
+  ClassConfOption<MappingInputFormat> MAPPING_INPUT_FORMAT_CLASS =
+      ClassConfOption.create("giraph.mappingInputFormatClass", null,
+          MappingInputFormat.class, "MappingInputFormat class");
 
   /** EdgeInputFilter class */
   ClassConfOption<EdgeInputFilter> EDGE_INPUT_FILTER_CLASS =

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index e9f50f9..3d7b3db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.conf;
 
+import com.google.common.base.Preconditions;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.Edge;
@@ -39,12 +40,14 @@ import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
 import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
+import org.apache.giraph.io.internal.WrappedMappingInputFormat;
 import org.apache.giraph.io.internal.WrappedVertexInputFormat;
 import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
 import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
@@ -53,6 +56,9 @@ import org.apache.giraph.io.superstep_output.SuperstepOutput;
 import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
 import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.job.GiraphJobRetryChecker;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingStoreOps;
+import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.master.SuperstepClasses;
@@ -65,6 +71,7 @@ import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.utils.io.BigDataInputOutput;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.giraph.utils.io.ExtendedDataInputOutput;
@@ -91,6 +98,8 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     V extends Writable, E extends Writable> extends GiraphConfiguration {
   /** Holder for all the classes */
   private final GiraphClasses classes;
+  /** Mapping target class */
+  private Class<? extends Writable> mappingTargetClass = null;
   /** Value (IVEMM) Factories */
   private final ValueFactories<I, V, E> valueFactories;
   /** Language values (IVEMM) are implemented in */
@@ -148,6 +157,27 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get the class used for edge translation during vertex input
+   *
+   * @return edge translation class
+   */
+  public Class<? extends TranslateEdge> edgeTranslationClass() {
+    return EDGE_TRANSLATION_CLASS.get(this);
+  }
+
+  /**
+   * Instance of TranslateEdge that contains helper method for edge translation
+   *
+   * @return instance of TranslateEdge
+   */
+  public TranslateEdge<I, E> edgeTranslationInstance() {
+    if (edgeTranslationClass() != null) {
+      return ReflectionUtils.newInstance(edgeTranslationClass(), this);
+    }
+    return null;
+  }
+
+  /**
    * Get the vertex input filter class
    *
    * @return VertexInputFilter class
@@ -274,6 +304,25 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get MappingInputFormatClass
+   *
+   * @return MappingInputFormatClass
+   */
+  public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
+  getMappingInputFormatClass() {
+    return classes.getMappingInputFormatClass();
+  }
+
+  /**
+   * Check if mappingInputFormat is set
+   *
+   * @return true if mappingInputFormat is set
+   */
+  public boolean hasMappingInputFormat() {
+    return classes.hasMappingInputFormat();
+  }
+
+  /**
    * Create a user vertex output format class.
    * Note: Giraph should only use WrappedVertexOutputFormat,
    * which makes sure that Configuration parameters are set properly.
@@ -287,6 +336,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create a user mapping input format class.
+   * Note: Giraph should only use WrappedMappingInputFormat,
+   * which makes sure that Configuration parameters are set properly.
+   *
+   * @return Instantiated user mapping input format class
+   */
+  private MappingInputFormat<I, V, E, ? extends Writable>
+  createMappingInputFormat() {
+    Class<? extends MappingInputFormat<I, V, E, ? extends Writable>> klass =
+        getMappingInputFormatClass();
+    return ReflectionUtils.newInstance(klass, this);
+  }
+
+  /**
    * Create a wrapper for user vertex output format,
    * which makes sure that Configuration parameters are set properly in all
    * methods related to this format.
@@ -300,6 +363,22 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     return wrappedVertexOutputFormat;
   }
 
+  /**
+   * Create a wrapper for user mapping input format,
+   * which makes sure that Configuration parameters are set properly in all
+   * methods related to this format.
+   *
+   * @return Wrapper around user mapping input format
+   */
+  public WrappedMappingInputFormat<I, V, E, ? extends Writable>
+  createWrappedMappingInputFormat() {
+    WrappedMappingInputFormat<I, V, E, ? extends Writable>
+      wrappedMappingInputFormat =
+        new WrappedMappingInputFormat<>(createMappingInputFormat());
+    configureIfPossible(wrappedMappingInputFormat);
+    return wrappedMappingInputFormat;
+  }
+
   @Override
   public boolean hasEdgeOutputFormat() {
     return classes.hasEdgeOutputFormat();
@@ -756,6 +835,24 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create edge based on #createEdge definition
+   *
+   * @param translateEdge instance of TranslateEdge
+   * @param edge edge to be translated
+   * @return translated edge
+   */
+  public Edge<I, E> createEdge(TranslateEdge<I, E>
+    translateEdge, Edge<I, E> edge) {
+    I translatedId = translateEdge.translateId(edge.getTargetVertexId());
+    if (isEdgeValueNullWritable()) {
+      return (Edge<I, E>) EdgeFactory.create(translatedId);
+    } else {
+      return EdgeFactory.create(translatedId,
+        translateEdge.cloneValue(edge.getValue()));
+    }
+  }
+
+  /**
    * Create a reusable edge.
    *
    * @return Instantiated reusable edge.
@@ -856,6 +953,74 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get MappingStore class to be used
+   *
+   * @return MappingStore class set by user
+   */
+  public Class<? extends MappingStore> getMappingStoreClass() {
+    return MAPPING_STORE_CLASS.get(this);
+  }
+
+  /**
+   * Create a {@link org.apache.giraph.mapping.MappingStore} instance
+   *
+   * @return MappingStore Instance
+   */
+  public MappingStore<I, ? extends Writable> createMappingStore() {
+    if (getMappingStoreClass() != null) {
+      return ReflectionUtils.newInstance(getMappingStoreClass(), this);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get MappingStoreOps class to be used
+   *
+   * @return MappingStoreOps class set by user
+   */
+  public Class<? extends MappingStoreOps> getMappingStoreOpsClass() {
+    return MAPPING_STORE_OPS_CLASS.get(this);
+  }
+
+  /**
+   * Create a {@link org.apache.giraph.mapping.MappingStoreOps} instance
+   *
+   * @return MappingStoreOps Instance
+   */
+  public MappingStoreOps<I, ? extends Writable> createMappingStoreOps() {
+    if (getMappingStoreOpsClass() != null) {
+      return ReflectionUtils.newInstance(getMappingStoreOpsClass(), this);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get mappingTarget class
+   *
+   * @return mappingTarget class
+   */
+  public Class<? extends Writable> getMappingTargetClass() {
+    if (mappingTargetClass == null) {
+      Class<?>[] classList = ReflectionUtils.getTypeArguments(
+        MappingStore.class, getMappingStoreClass());
+      Preconditions.checkArgument(classList.length == 2);
+      mappingTargetClass = (Class<? extends Writable>) classList[1];
+    }
+    return mappingTargetClass;
+  }
+
+  /**
+   * Create and return mappingTarget instance
+   *
+   * @return mappingTarget instance
+   */
+  public Writable createMappingTarget() {
+    return WritableUtils.createWritable(getMappingTargetClass());
+  }
+
+  /**
    * Create a user {@link org.apache.giraph.edge.OutEdges}
    *
    * @return Instantiated user OutEdges

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java
new file mode 100644
index 0000000..2666268
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ *
+ * Use this to load data for a BSP application.  Note that the InputSplit must
+ * also implement Writable.
+ *
+ * It's guaranteed that whatever parameters are set in the configuration are
+ * also going to be available in all method arguments related to this input
+ * format (context in getSplits and createVertexReader; methods invoked on
+ * MappingReader). So if backing input format relies on some parameters from
+ * configuration, you can safely set them for example in
+ * {@link #setConf(org.apache.giraph.conf.ImmutableClassesGiraphConfiguration)}.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public abstract class MappingInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, B extends Writable>
+    extends GiraphInputFormat<I, V, E> {
+
+  /**
+   * Create a vertex reader for a given split. Guaranteed to have been
+   * configured with setConf() prior to use.  The framework will also call
+   * {@link VertexReader#initialize(InputSplit,
+   * org.apache.hadoop.mapreduce.TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param split the split to be read
+   * @param context the information about the task
+   * @return a new record reader
+   * @throws IOException
+   */
+  public abstract MappingReader<I, V, E, B> createMappingReader(
+      InputSplit split, TaskAttemptContext context) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
new file mode 100644
index 0000000..b7ce97c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Will read the mapping from an input split.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public abstract class MappingReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, B extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    implements WorkerAggregatorUsage {
+
+  /** Aggregator usage for vertex reader */
+  private WorkerAggregatorUsage workerAggregatorUsage;
+
+  /**
+   * Use the input split and context to setup reading the vertices.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param inputSplit Input split to be used for reading vertices.
+   * @param context Context from the task.
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  public abstract void initialize(InputSplit inputSplit,
+    TaskAttemptContext context)
+    throws IOException, InterruptedException;
+
+
+  /**
+   * Set aggregator usage. It provides the functionality
+   * of aggregation operation in reading a vertex.
+   * It is invoked just after initialization.
+   * E.g.,
+   * vertexReader.initialize(inputSplit, context);
+   * vertexReader.setAggregator(aggregatorUsage);
+   * This method is only for use by the infrastructure.
+   *
+   * @param agg aggregator usage for vertex reader
+   */
+  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+    workerAggregatorUsage = agg;
+  }
+
+  /**
+   *
+   * @return false iff there are no more vertices
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract boolean nextEntry() throws IOException,
+    InterruptedException;
+
+
+  /**
+   * Get the current entry.
+   *
+   * @return the current entry which has been read.
+   *         nextVEntry() should be called first.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract MappingEntry<I, B> getCurrentEntry()
+    throws IOException, InterruptedException;
+
+
+  /**
+   * Close this {@link MappingReader} to future operations.
+   *
+   * @throws IOException
+   */
+  public abstract void close() throws IOException;
+
+  /**
+   * How much of the input has the {@link VertexReader} consumed i.e.
+   * has been processed by?
+   *
+   * @return Progress from <code>0.0</code> to <code>1.0</code>.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract float getProgress() throws IOException, InterruptedException;
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    workerAggregatorUsage.aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return workerAggregatorUsage.getAggregatedValue(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java
new file mode 100644
index 0000000..72f8177
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.job.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * For internal use only.
+ *
+ * Wraps user set {@link org.apache.giraph.io.VertexInputFormat} to make
+ * sure proper configuration parameters are passed around, that user can set
+ * parameters in configuration and they will be available in other methods
+ * related to this format.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class WrappedMappingInputFormat<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  extends MappingInputFormat<I, V, E, B> {
+  /** originalInputFormat to wrap over */
+  private MappingInputFormat<I, V, E, B> originalInputFormat;
+
+  /**
+   * Constructor
+   *
+   * @param mappingInputFormat original mappingInputFormat
+   */
+  public WrappedMappingInputFormat(
+      MappingInputFormat<I, V, E, B> mappingInputFormat) {
+    originalInputFormat = mappingInputFormat;
+  }
+
+  @Override
+  public void checkInputSpecs(Configuration conf) {
+    originalInputFormat.checkInputSpecs(conf);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+    throws IOException, InterruptedException {
+    return originalInputFormat.getSplits(
+        HadoopUtils.makeJobContext(getConf(), context),
+        minSplitCountHint);
+  }
+
+  @Override
+  public MappingReader<I, V, E, B> createMappingReader(InputSplit split,
+    TaskAttemptContext context) throws IOException {
+    final MappingReader<I, V, E, B> mappingReader = originalInputFormat
+        .createMappingReader(split,
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+    return new WrappedMappingReader<>(mappingReader, getConf());
+  }
+
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+    DataOutput dataOutput) throws IOException {
+    originalInputFormat.writeInputSplit(inputSplit, dataOutput);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+    DataInput dataInput) throws IOException, ClassNotFoundException {
+    return originalInputFormat.readInputSplit(dataInput);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
new file mode 100644
index 0000000..7d1c4c9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.job.HadoopUtils;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * For internal use only.
+ *
+ * Wraps {@link org.apache.giraph.io.MappingReader} to make sure proper
+ * configuration parameters are passed around, that parameters set in original
+ * configuration are available in methods of this reader
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class WrappedMappingReader<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  extends MappingReader<I, V, E, B> {
+  /** User set baseMappingReader wrapped over */
+  private final MappingReader<I, V, E, B> baseMappingReader;
+
+  /**
+   * Constructor
+   *
+   * @param baseMappingReader User set baseMappingReader
+   * @param conf configuration
+   */
+  public WrappedMappingReader(MappingReader<I, V, E, B> baseMappingReader,
+    ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    this.baseMappingReader = baseMappingReader;
+    super.setConf(conf);
+    baseMappingReader.setConf(conf);
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    // We don't want to use external configuration
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+    TaskAttemptContext context) throws IOException, InterruptedException {
+    baseMappingReader.initialize(inputSplit,
+        HadoopUtils.makeTaskAttemptContext(getConf(), context));
+  }
+
+
+  @Override
+  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+    // Set aggregator usage for vertex reader
+    baseMappingReader.setWorkerAggregatorUse(agg);
+  }
+
+  @Override
+  public boolean nextEntry() throws IOException, InterruptedException {
+    return baseMappingReader.nextEntry();
+  }
+
+  @Override
+  public MappingEntry<I, B> getCurrentEntry()
+    throws IOException, InterruptedException {
+    return baseMappingReader.getCurrentEntry();
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    baseMappingReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return baseMappingReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java
new file mode 100644
index 0000000..d8b9c31
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Wraps {@link GiraphReader} for mapping into
+ * {@link org.apache.giraph.io.MappingReader}
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class MappingReaderWrapper<I extends WritableComparable,
+  V extends Writable, E extends  Writable, B extends Writable>
+  extends MappingReader<I, V, E, B> {
+  /** Wrapped mapping reader */
+  private GiraphReader<MappingEntry<I, B>> mappingReader;
+  /**
+   * {@link org.apache.giraph.io.MappingReader}-like wrapper of
+   * {@link #mappingReader}
+   */
+  private IteratorToReaderWrapper<MappingEntry<I, B>> iterator;
+
+  /**
+   * Constructor
+   *
+   * @param mappingReader user supplied mappingReader
+   */
+  public MappingReaderWrapper(GiraphReader<MappingEntry<I, B>> mappingReader) {
+    this.mappingReader = mappingReader;
+    iterator = new IteratorToReaderWrapper<>(mappingReader);
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    super.setConf(conf);
+    conf.configureIfPossible(mappingReader);
+  }
+
+  @Override
+  public boolean nextEntry() throws IOException, InterruptedException {
+    return iterator.nextObject();
+  }
+
+  @Override
+  public MappingEntry<I, B> getCurrentEntry()
+    throws IOException, InterruptedException {
+    return iterator.getCurrentObject();
+  }
+
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+    TaskAttemptContext context) throws IOException, InterruptedException {
+    mappingReader.initialize(inputSplit, context);
+  }
+
+  @Override
+  public void close() throws IOException {
+    mappingReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return mappingReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java
new file mode 100644
index 0000000..2e2310b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Implementation of basic methods in MappingStoreOps
+ */
+@SuppressWarnings("unchecked, rawtypes")
+public abstract class AbstractLongByteOps
+  implements MappingStoreOps<LongWritable, ByteWritable> {
+  /** Mapping store instance to operate on */
+  protected LongByteMappingStore mappingStore;
+
+  @Override
+  public void initialize(MappingStore<LongWritable,
+      ByteWritable> mappingStore) {
+    this.mappingStore = (LongByteMappingStore) mappingStore;
+  }
+
+  /**
+   * Compute partition given id, partitionCount, workerCount & target
+   * @param id vertex id
+   * @param partitionCount number of partitions
+   * @param workerCount number of workers
+   * @param target target worker
+   * @return partition number
+   */
+  protected int computePartition(LongWritable id, int partitionCount,
+    int workerCount, byte target) {
+    int numRows = partitionCount / workerCount;
+    numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
+    if (target == -1) {
+      // default to hash based partitioning
+      return Math.abs(id.hashCode() % partitionCount);
+    } else {
+      int targetWorker = target & 0xFF;
+      // assume zero based indexing of partition & worker [also consecutive]
+      return numRows * targetWorker + Math.abs(id.hashCode() % numRows);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java
new file mode 100644
index 0000000..0b6f3e4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MappingStoreOps implementation used to embed target information into
+ * vertex id. Stores information in the higher order bits of the long id
+ */
+public class DefaultEmbeddedLongByteOps extends AbstractLongByteOps {
+  /** Bit mask for first 9 bits in a long */
+  private static final long MASK = ((long) 0x1FF) << 55;
+  /** Inverse of MASK */
+  private static final long IMASK = ~ MASK;
+
+  /**
+   * Default constructor (do not use)
+   */
+  public DefaultEmbeddedLongByteOps() {
+  }
+
+  @Override
+  public boolean hasEmbedding() {
+    return true;
+  }
+
+  @Override
+  public void embedTargetInfo(LongWritable id) {
+    if ((id.get() & MASK) != 0) {
+      throw new IllegalStateException("Expected first 9 bits of long " +
+          " to be empty");
+    }
+    byte target = mappingStore.getByteTarget(id);
+    // first bit = 0 & rest 8 bits set to target
+    // add 1 to distinguish between not set and assignment to worker-0
+    // (prefix bits = 0 can mean one of two things :
+    // no entry in the mapping, in which case target = -1, so -1 + 1 = 0
+    // vertex is created later during computation, so prefix bits are 0 anyway)
+    long maskValue = ((1L + target) & 0xFF) << 55;
+    id.set(id.get() | maskValue);
+  }
+
+  @Override
+  public void removeTargetInfo(LongWritable id) {
+    id.set(id.get() & IMASK);
+  }
+
+  @Override
+  public int getPartition(LongWritable id, int partitionCount,
+    int workerCount) {
+    // extract last 8 bits
+    // subtract 1 since added 1 during embedInfo (unset = -1)
+    byte target = (byte) (((id.get() >>> 55) & 0xFF) - 1);
+    return computePartition(id, partitionCount, workerCount, target);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java
new file mode 100644
index 0000000..57ece94
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MappingStoreOps implementation which reads partition information from map
+ */
+@SuppressWarnings("unchecked, rawtypes")
+public class DefaultLongByteOps extends AbstractLongByteOps {
+
+  /**
+   * Default constructor (do not use)
+   */
+  public DefaultLongByteOps() {
+  }
+
+  @Override
+  public boolean hasEmbedding() {
+    return false;
+  }
+
+  @Override
+  public void embedTargetInfo(LongWritable id) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void removeTargetInfo(LongWritable id) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getPartition(LongWritable id, int partitionCount,
+    int workerCount) {
+    byte target = mappingStore.getByteTarget(id);
+    return computePartition(id, partitionCount, workerCount, target);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java b/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java
new file mode 100644
index 0000000..996fed0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import com.google.common.collect.MapMaker;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ *
+ * An implementation of MappingStore<LongWritable, ByteWritable>
+ *
+ * Methods implemented here are thread safe by default because it is guaranteed
+ * that each entry is written to only once.
+ * It can represent up to a maximum of 254 workers
+ * any byte passed is treated as unsigned
+ */
+@ThreadSafe
+public class LongByteMappingStore
+  extends DefaultImmutableClassesGiraphConfigurable<LongWritable, Writable,
+  Writable> implements MappingStore<LongWritable, ByteWritable> {
+  /** Logger instance */
+  private static final Logger LOG = Logger.getLogger(
+    LongByteMappingStore.class);
+
+  /** Counts number of entries added */
+  private final AtomicLong numEntries = new AtomicLong(0);
+
+  /** Id prefix to bytesArray index mapping */
+  private ConcurrentMap<Long, byte[]> concurrentIdToBytes;
+  /** Primitive idToBytes for faster querying */
+  private Long2ObjectOpenHashMap<byte[]> idToBytes;
+  /** Number of lower order bits */
+  private int lower;
+  /** Number of distinct prefixes */
+  private int upper;
+  /** Bit mask for lowerOrder suffix bits */
+  private int lowerBitMask;
+  /** LowerOrder bits count */
+  private int lowerOrder;
+
+  @Override
+  public void initialize() {
+    upper = GiraphConstants.LB_MAPPINGSTORE_UPPER.get(getConf());
+    lower = GiraphConstants.LB_MAPPINGSTORE_LOWER.get(getConf());
+
+    if ((lower & (lower - 1)) != 0) {
+      throw new IllegalStateException("lower not a power of two");
+    }
+
+    lowerBitMask = lower - 1;
+    lowerOrder = Integer.numberOfTrailingZeros(lower); // log_2_(lower)
+    concurrentIdToBytes = new MapMaker()
+        .initialCapacity(upper)
+        .concurrencyLevel(getConf().getNumInputSplitsThreads())
+        .makeMap();
+    idToBytes = new Long2ObjectOpenHashMap<>(upper);
+  }
+
+  /**
+   * Auxiliary method to be used by getTarget
+   *
+   * @param vertexId vertexId
+   * @return return byte value of target
+   */
+  public byte getByteTarget(LongWritable vertexId) {
+    long key = vertexId.get() >>> lowerOrder;
+    int suffix = (int) (vertexId.get() & lowerBitMask);
+    if (!idToBytes.containsKey(key)) {
+      return -1;
+    }
+    return idToBytes.get(key)[suffix];
+  }
+
+  @Override
+  public void addEntry(LongWritable vertexId, ByteWritable target) {
+    long key = vertexId.get() >>> lowerOrder;
+    byte[] bytes = concurrentIdToBytes.get(key);
+    if (bytes == null) {
+      byte[] newBytes = new byte[lower];
+      Arrays.fill(newBytes, (byte) -1);
+      bytes = concurrentIdToBytes.putIfAbsent(key, newBytes);
+      if (bytes == null) {
+        bytes = newBytes;
+      }
+    }
+    bytes[(int) (vertexId.get() & lowerBitMask)] = target.get();
+    numEntries.getAndIncrement(); // increment count
+  }
+
+  @Override
+  public ByteWritable getTarget(LongWritable vertexId,
+    ByteWritable target) {
+    Byte bval = getByteTarget(vertexId);
+    if (bval == -1) { // worker not assigned by mapping
+      return null;
+    }
+    target.set(bval);
+    return target;
+  }
+
+  @Override
+  public void postFilling() {
+    // not thread-safe
+    for (Long id : concurrentIdToBytes.keySet()) {
+      idToBytes.put(id, concurrentIdToBytes.get(id));
+    }
+    concurrentIdToBytes.clear();
+    concurrentIdToBytes = null;
+  }
+
+  @Override
+  public long getStats() {
+    return numEntries.longValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java
new file mode 100644
index 0000000..8c8efa1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An entry in MappingStore
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public class MappingEntry<I extends WritableComparable, B extends Writable> {
+  /** Vertex Id */
+  private I vertexId;
+  /** Mapping Target */
+  private B mappingTarget;
+
+  /**
+   * Constructor
+   *
+   * @param vertexId vertexId
+   * @param mappingTarget mappingTarget
+   */
+  public MappingEntry(I vertexId, B mappingTarget) {
+    this.vertexId = vertexId;
+    this.mappingTarget = mappingTarget;
+  }
+
+  public I getVertexId() {
+    return vertexId;
+  }
+
+  public B getMappingTarget() {
+    return mappingTarget;
+  }
+
+  public void setVertexId(I vertexId) {
+    this.vertexId = vertexId;
+  }
+
+  public void setMappingTarget(B mappingTarget) {
+    this.mappingTarget = mappingTarget;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java
new file mode 100644
index 0000000..5b872a4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * MappingStore used to store the vertexId - target map supplied by user
+ * Methods implemented in this class need to be thread safe
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public interface MappingStore<I extends WritableComparable, B extends Writable>
+  extends ImmutableClassesGiraphConfigurable<I, Writable, Writable> {
+
+  /**
+   * Must be called before anything else can be done
+   * on this instance
+   */
+  void initialize();
+
+  /**
+   * Add an entry to the mapping store
+   *
+   * @param vertexId vertexId
+   * @param target target
+   */
+  void addEntry(I vertexId, B target);
+
+  /**
+   * Get target for given vertexId
+   *
+   * @param vertexId vertexId
+   * @param target instance to use for storing target information
+   * @return target instance
+   */
+  B getTarget(I vertexId, B target);
+
+  /**
+   * Operations to perform after adding entries
+   * to the mapping store
+   */
+  void postFilling();
+
+  /**
+   * Get stats about the MappingStore
+   *
+   * @return numEntries
+   */
+  long getStats();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java
new file mode 100644
index 0000000..aba034e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface of operations that can be done on mapping store
+ * once it is fully loaded
+ *
+ * @param <I> vertex id type
+ * @param <B> mapping target type
+ */
+public interface MappingStoreOps<I extends WritableComparable,
+  B extends Writable> {
+
+  /**
+   * Must be called before anything else can be done
+   * on this instance
+   * @param mappingStore mapping store instance to operate on
+   */
+  void initialize(MappingStore<I, B> mappingStore);
+
+  /**
+   * True if MappingStoreOps is based on embedding info
+   *
+   * @return true if worker info is embedded into vertex ids
+   */
+  boolean hasEmbedding();
+
+  /**
+   * Embed target information into vertexId
+   *
+   * @param id vertexId
+   */
+  void embedTargetInfo(I id);
+
+  /**
+   * Remove target information from vertexId
+   *
+   * @param id vertexId
+   */
+  void removeTargetInfo(I id);
+
+
+  /**
+   * Get partition id for a vertex id
+   *
+   * @param id vertexId
+   * @param partitionCount partitionCount
+   * @param workerCount workerCount
+   * @return partition of vertex id
+   */
+  int getPartition(I id, int partitionCount, int workerCount);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java b/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java
new file mode 100644
index 0000000..ae23475
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package contains definition and implementations of MappingStore and
+ * related concepts
+ */
+package org.apache.giraph.mapping;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java
new file mode 100644
index 0000000..102ef50
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping.translate;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.giraph.worker.LocalData;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Basic implementation of Translate Edge
+ * where I = LongWritable & B = ByteWritable
+ *
+ * @param <E> edge value type
+ */
+@SuppressWarnings("unchecked")
+public class LongByteTranslateEdge<E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable
+  implements TranslateEdge<LongWritable, E> {
+
+  /** Local data used for targetId translation of edge */
+  private LocalData<LongWritable,
+    ? extends Writable, E, ByteWritable> localData;
+
+  @Override
+  public void initialize(BspServiceWorker<LongWritable,
+    ? extends Writable, E> service) {
+    localData = (LocalData<LongWritable, ? extends Writable, E, ByteWritable>)
+      service.getLocalData();
+  }
+
+  @Override
+  public LongWritable translateId(LongWritable targetId) {
+    LongWritable translatedId = new LongWritable();
+    translatedId.set(targetId.get());
+    localData.getMappingStoreOps().embedTargetInfo(translatedId);
+    return translatedId;
+  }
+
+  @Override
+  public E cloneValue(E edgeValue) {
+    // If vertex input does not have create edges,
+    // then you can use LongByteTranslateEdge directly
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = nullwritable
+   */
+  public static class NoEdgeValue
+    extends LongByteTranslateEdge<NullWritable> {
+    @Override
+    public NullWritable cloneValue(NullWritable edgeValue) {
+      return NullWritable.get();
+    }
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = intwritable
+   */
+  public static class IntEdgeValue
+      extends LongByteTranslateEdge<IntWritable> {
+    @Override
+    public IntWritable cloneValue(IntWritable edgeValue) {
+      return new IntWritable(edgeValue.get());
+    }
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = longwritable
+   */
+  public static class LongEdgeValue
+    extends LongByteTranslateEdge<LongWritable> {
+    @Override
+    public LongWritable cloneValue(LongWritable edgeValue) {
+      return new LongWritable(edgeValue.get());
+    }
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = floatwritable
+   */
+  public static class FloatEdgeValue
+    extends LongByteTranslateEdge<FloatWritable> {
+    @Override
+    public FloatWritable cloneValue(FloatWritable edgeValue) {
+      return new FloatWritable(edgeValue.get());
+    }
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = doublewritable
+   */
+  public static class DoubleEdgeValue
+    extends LongByteTranslateEdge<DoubleWritable> {
+    @Override
+    public DoubleWritable cloneValue(DoubleWritable edgeValue) {
+      return new DoubleWritable(edgeValue.get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java
new file mode 100644
index 0000000..85e8768
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping.translate;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Used to conduct expensive translation of edges
+ * during vertex input phase
+ *
+ * @param <I> vertexId type
+ * @param <E> edgeValue type
+ */
+public interface TranslateEdge<I extends WritableComparable, E extends Writable>
+  extends ImmutableClassesGiraphConfigurable {
+  /**
+   * Must be called before other methods can be used
+   *
+   * @param service bsp service worker
+   */
+  void initialize(BspServiceWorker<I, ? extends Writable, E> service);
+
+  /**
+   * Translate Id & return a new instance
+   *
+   * @param targetId edge target Id
+   * @return a new translated Id instance
+   */
+  I translateId(I targetId);
+
+  /**
+   * Clone edge value
+   *
+   * @param edgeValue edge value
+   * @return clone of edge value
+   */
+  E cloneValue(E edgeValue);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java
new file mode 100644
index 0000000..8536e83
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Definitions & sample implementations of edge translation logic
+ */
+package org.apache.giraph.mapping.translate;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 90dc9f3..e367b94 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -40,6 +40,7 @@ import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.partition.MasterGraphPartitioner;
 import org.apache.giraph.partition.PartitionOwner;
@@ -119,7 +120,7 @@ import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
  * @param <V> Vertex data
  * @param <E> Edge data
  */
-@SuppressWarnings("rawtypes")
+@SuppressWarnings("rawtypes, unchecked")
 public class BspServiceMaster<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends BspService<I, V, E>
@@ -217,7 +218,7 @@ public class BspServiceMaster<I extends WritableComparable,
     observers = conf.createMasterObservers();
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
-    GiraphStats.init(context);
+    GiraphStats.init((Mapper.Context) context);
   }
 
   @Override
@@ -676,6 +677,17 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   @Override
+  public int createMappingInputSplits() {
+    if (!getConfiguration().hasMappingInputFormat()) {
+      return 0;
+    }
+    MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
+      getConfiguration().createWrappedMappingInputFormat();
+    return createInputSplits(mappingInputFormat, mappingInputSplitsPaths,
+      "Mapping");
+  }
+
+  @Override
   public int createVertexInputSplits() {
     // Short-circuit if there is no vertex input format
     if (!getConfiguration().hasVertexInputFormat()) {
@@ -1589,8 +1601,12 @@ public class BspServiceMaster<I extends WritableComparable,
 
     if (getSuperstep() == INPUT_SUPERSTEP) {
       // Initialize aggregators before coordinating
-      // vertex loading and edge loading
       initializeAggregatorInputSuperstep();
+      if (getConfiguration().hasMappingInputFormat()) {
+        coordinateInputSplits(mappingInputSplitsPaths, mappingInputSplitsEvents,
+            "Mapping");
+      }
+      // vertex loading and edge loading
       if (getConfiguration().hasVertexInputFormat()) {
         coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
             "Vertex");


[2/3] GIRAPH-908: support for partitioned input in giraph (pavanka)

Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index 15dbe07..0635210 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -107,7 +107,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
         // Attempt to create InputSplits if necessary. Bail out if that fails.
         if (bspServiceMaster.getRestartedSuperstep() !=
             BspService.UNSET_SUPERSTEP ||
-            (bspServiceMaster.createVertexInputSplits() != -1 &&
+            (bspServiceMaster.createMappingInputSplits() != -1 &&
+                bspServiceMaster.createVertexInputSplits() != -1 &&
                 bspServiceMaster.createEdgeInputSplits() != -1)) {
           long setupMillis = System.currentTimeMillis() - initializeMillis;
           GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
@@ -123,7 +124,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
             superstepState = bspServiceMaster.coordinateSuperstep();
             long superstepMillis = System.currentTimeMillis() -
                 startSuperstepMillis;
-            superstepSecsMap.put(Long.valueOf(cachedSuperstep),
+            superstepSecsMap.put(cachedSuperstep,
                 superstepMillis / 1000.0d);
             if (LOG.isInfoEnabled()) {
               LOG.info("masterThread: Coordination of superstep " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
index 4200d79..c5e2f3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -32,7 +33,14 @@ import org.apache.hadoop.io.WritableComparable;
 @SuppressWarnings("rawtypes")
 public interface GraphPartitionerFactory<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
-    ImmutableClassesGiraphConfigurable {
+    ImmutableClassesGiraphConfigurable<I, V, E> {
+
+  /**
+   * Use some local data present in the worker
+   *
+   * @param localData localData present in the worker
+   */
+  void initialize(LocalData<I, V, E, ? extends Writable> localData);
   /**
    * Create the {@link MasterGraphPartitioner} used by the master.
    * Instantiated once by the master and reused.

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
index 7cc5651..221e50d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
@@ -18,7 +18,8 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -32,10 +33,13 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public class HashPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements GraphPartitionerFactory<I, V, E> {
-  /** Saved configuration */
-  private ImmutableClassesGiraphConfiguration conf;
+  V extends Writable, E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+  implements GraphPartitionerFactory<I, V, E>  {
+
+  @Override
+  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+  }
 
   @Override
   public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
@@ -46,14 +50,4 @@ public class HashPartitionerFactory<I extends WritableComparable,
   public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
     return new HashWorkerPartitioner<I, V, E>();
   }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
index 1eeece7..5f7ee40 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
@@ -18,7 +18,8 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -32,10 +33,13 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public class HashRangePartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements GraphPartitionerFactory<I, V, E> {
-  /** Saved configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  V extends Writable, E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+  implements GraphPartitionerFactory<I, V, E> {
+
+  @Override
+  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+  }
 
   @Override
   public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
@@ -46,14 +50,4 @@ public class HashRangePartitionerFactory<I extends WritableComparable,
   public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
     return new HashRangeWorkerPartitioner<I, V, E>();
   }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
new file mode 100644
index 0000000..e129050
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.worker.LocalData;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * Factory for long-byte mapping based partitioners.
+ *
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ */
+@SuppressWarnings("unchecked")
+public class LongMappingStorePartitionerFactory<V extends Writable,
+    E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
+  /** Logger Instance */
+  private static final Logger LOG = Logger.getLogger(
+      LongMappingStorePartitionerFactory.class);
+  /** Local Data that supplies the mapping store */
+  protected LocalData<LongWritable, V, E, ? extends Writable> localData = null;
+
+  @Override
+  public void initialize(LocalData<LongWritable, V, E,
+    ? extends Writable> localData) {
+    this.localData = localData;
+    LOG.info("Initializing LongMappingStorePartitionerFactory with localData");
+  }
+
+  @Override
+  protected int getPartition(LongWritable id, int partitionCount,
+    int workerCount) {
+    return localData.getMappingStoreOps().getPartition(id,
+        partitionCount, workerCount);
+  }
+
+  @Override
+  protected int getWorker(int partition, int partitionCount, int workerCount) {
+    int numRows = partitionCount / workerCount;
+    numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
+    return partition / numRows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
index 8ab692f..5dd580b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
@@ -31,14 +31,26 @@ import org.apache.hadoop.io.Writable;
  * @param <V> Vertex value type
  * @param <E> Edge value type
  */
-public class SimpleIntRangePartitionerFactory
-    <V extends Writable, E extends Writable>
-    extends SimplePartitionerFactory<IntWritable, V, E> {
+public class SimpleIntRangePartitionerFactory<V extends Writable,
+  E extends Writable> extends SimplePartitionerFactory<IntWritable, V, E> {
 
   /** Vertex key space size. */
   private int keySpaceSize;
 
   @Override
+  protected int getPartition(IntWritable id, int partitionCount,
+    int workerCount) {
+    return getPartition(id, partitionCount);
+  }
+
+  /**
+   * Calculates in which partition current vertex belongs to,
+   * from interval [0, partitionCount).
+   *
+   * @param id Vertex id
+   * @param partitionCount Number of partitions
+   * @return partition
+   */
   protected int getPartition(IntWritable id, int partitionCount) {
     return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
index 2989598..e637e16 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
@@ -31,14 +31,26 @@ import org.apache.hadoop.io.Writable;
  * @param <V> Vertex value type
  * @param <E> Edge value type
  */
-public class SimpleLongRangePartitionerFactory
-    <V extends Writable, E extends Writable>
-    extends SimplePartitionerFactory<LongWritable, V, E> {
+public class SimpleLongRangePartitionerFactory<V extends Writable,
+  E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
 
   /** Vertex key space size. */
   private long keySpaceSize;
 
   @Override
+  protected int getPartition(LongWritable id, int partitionCount,
+    int workerCount) {
+    return getPartition(id, partitionCount);
+  }
+
+  /**
+   * Calculates in which partition current vertex belongs to,
+   * from interval [0, partitionCount).
+   *
+   * @param id Vertex id
+   * @param partitionCount Number of partitions
+   * @return partition
+   */
   protected int getPartition(LongWritable id, int partitionCount) {
     return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
index 15b0756..1e29846 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
@@ -18,7 +18,8 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -33,14 +34,17 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <E> Edge value
  */
 public abstract class SimplePartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements GraphPartitionerFactory<I, V, E> {
-  /** Configuration. */
-  private ImmutableClassesGiraphConfiguration conf;
+  V extends Writable, E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+  implements GraphPartitionerFactory<I, V, E> {
+
+  @Override
+  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+  }
 
   @Override
   public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
-    return new SimpleMasterPartitioner<I, V, E>(conf) {
+    return new SimpleMasterPartitioner<I, V, E>(getConf()) {
       @Override
       protected int getWorkerIndex(int partition, int partitionCount,
           int workerCount) {
@@ -54,31 +58,26 @@ public abstract class SimplePartitionerFactory<I extends WritableComparable,
   public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
     return new SimpleWorkerPartitioner<I, V, E>() {
       @Override
-      protected int getPartitionIndex(I id, int partitionCount) {
-        return SimplePartitionerFactory.this.getPartition(id, partitionCount);
+      protected int getPartitionIndex(I id, int partitionCount,
+        int workerCount) {
+        return SimplePartitionerFactory.this.getPartition(id,
+            partitionCount, workerCount);
       }
     };
   }
 
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public final ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
   /**
    * Calculates in which partition current vertex belongs to,
    * from interval [0, partitionCount).
    *
    * @param id Vertex id
    * @param partitionCount Number of partitions
+   * @param workerCount Number of workers
    * @return partition
    */
-  protected abstract int getPartition(I id, int partitionCount);
+  protected abstract int getPartition(I id, int partitionCount,
+    int workerCount);
+
   /**
    * Calculates worker that should be responsible for passed partition.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
index 600d7a3..3c0de44 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
@@ -19,13 +19,16 @@
 package org.apache.giraph.partition;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
 
 /**
  * Abstracts and implements all WorkerGraphPartitioner logic on top of a single
@@ -38,8 +41,13 @@ import com.google.common.collect.Lists;
 public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
     V extends Writable, E extends Writable>
     implements WorkerGraphPartitioner<I, V, E> {
+  /** Logger instance */
+  private static final Logger LOG = Logger.getLogger(
+      SimpleWorkerPartitioner.class);
   /** List of {@link PartitionOwner}s for this worker. */
   private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
+  /** List of available workers */
+  private Set<WorkerInfo> availableWorkers = new HashSet<>();
 
   @Override
   public PartitionOwner createPartitionOwner() {
@@ -49,7 +57,8 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
   @Override
   public PartitionOwner getPartitionOwner(I vertexId) {
     return partitionOwnerList.get(
-        getPartitionIndex(vertexId, partitionOwnerList.size()));
+        getPartitionIndex(vertexId, partitionOwnerList.size(),
+            availableWorkers.size()));
   }
 
   @Override
@@ -64,8 +73,11 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
   public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
       PartitionStore<I, V, E> partitionStore) {
-    return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
-        myWorkerInfo, masterSetPartitionOwners, partitionStore);
+    PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
+        partitionOwnerList, myWorkerInfo, masterSetPartitionOwners,
+        partitionStore);
+    extractAvailableWorkers();
+    return exchange;
   }
 
   @Override
@@ -74,12 +86,26 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
   }
 
   /**
+   * Update availableWorkers
+   */
+  public void extractAvailableWorkers() {
+    availableWorkers.clear();
+    for (PartitionOwner partitionOwner : partitionOwnerList) {
+      availableWorkers.add(partitionOwner.getWorkerInfo());
+    }
+    LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
+        " workers are available");
+  }
+
+  /**
    * Calculates in which partition current vertex belongs to,
    * from interval [0, partitionCount).
    *
    * @param id Vertex id
    * @param partitionCount Number of partitions
+   * @param workerCount Number of active workers
    * @return partition
    */
-  protected abstract int getPartitionIndex(I id, int partitionCount);
+  protected abstract int getPartitionIndex(I id, int partitionCount,
+    int workerCount);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index aff7084..104932c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -46,6 +46,7 @@ import org.apache.giraph.io.EdgeWriter;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
+import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.master.SuperstepClasses;
 import org.apache.giraph.metrics.GiraphMetrics;
@@ -133,7 +134,10 @@ public class BspServiceWorker<I extends WritableComparable,
   private final WorkerInfo workerInfo;
   /** Worker graph partitioner */
   private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
-
+  /** Local Data for each worker */
+  private final LocalData<I, V, E, ? extends Writable> localData;
+  /** Used to translate Edges during vertex input phase based on localData */
+  private final TranslateEdge<I, E> translateEdge;
   /** IPC Client */
   private final WorkerClient<I, V, E> workerClient;
   /** IPC Server */
@@ -182,6 +186,11 @@ public class BspServiceWorker<I extends WritableComparable,
     throws IOException, InterruptedException {
     super(context, graphTaskManager);
     ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
+    localData = new LocalData<>(conf);
+    translateEdge = getConfiguration().edgeTranslationInstance();
+    if (translateEdge != null) {
+      translateEdge.initialize(this);
+    }
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
     workerGraphPartitioner =
@@ -237,6 +246,14 @@ public class BspServiceWorker<I extends WritableComparable,
     return workerClient;
   }
 
+  public LocalData<I, V, E, ? extends Writable> getLocalData() {
+    return localData;
+  }
+
+  public TranslateEdge<I, E> getTranslateEdge() {
+    return translateEdge;
+  }
+
   /**
    * Intended to check the health of the node.  For instance, can it ssh,
    * dmesg, etc. For now, does nothing.
@@ -293,6 +310,55 @@ public class BspServiceWorker<I extends WritableComparable,
     return vertexEdgeCount;
   }
 
+  /**
+   * Load the mapping entries from the user-defined
+   * {@link org.apache.giraph.io.MappingReader}
+   *
+   * @return Count of mapping entries loaded
+   */
+  private Integer loadMapping() throws KeeperException,
+    InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(mappingInputSplitsPaths.getPath(),
+        false, false, true);
+
+    InputSplitPathOrganizer splitOrganizer =
+        new InputSplitPathOrganizer(getZkExt(),
+            inputSplitPathList, getWorkerInfo().getHostname(),
+            getConfiguration().useInputSplitLocality());
+
+    MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
+        mappingInputSplitsCallableFactory =
+        new MappingInputSplitsCallableFactory<>(
+            getConfiguration().createWrappedMappingInputFormat(),
+            splitOrganizer,
+            getContext(),
+            getConfiguration(),
+            this,
+            getZkExt());
+
+    int entriesLoaded = 0;
+    // Determine how many threads to use based on the number of input splits
+    int maxInputSplitThreads = inputSplitPathList.size();
+    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+        maxInputSplitThreads);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
+          "originally " + getConfiguration().getNumInputSplitsThreads() +
+          " threads(s) for " + inputSplitPathList.size() + " total splits.");
+    }
+
+    List<Integer> results =
+        ProgressableUtils.getResultsWithNCallables(
+            mappingInputSplitsCallableFactory,
+            numThreads, "load-mapping-%d", getContext());
+    for (Integer result : results) {
+      entriesLoaded += result;
+    }
+    // after all threads finish loading - call postFilling
+    localData.getMappingStore().postFilling();
+    return entriesLoaded;
+  }
 
   /**
    * Load the vertices from the user-defined
@@ -403,13 +469,15 @@ public class BspServiceWorker<I extends WritableComparable,
   }
 
   /**
-   * Wait for all workers to finish processing input splits.
+   * Mark current worker as done and then wait for all workers
+   * to finish processing input splits.
    *
    * @param inputSplitPaths Input split paths
    * @param inputSplitEvents Input split events
    */
-  private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
-                                   InputSplitEvents inputSplitEvents) {
+  private void markCurrentWorkerDoneThenWaitForOthers(
+    InputSplitPaths inputSplitPaths,
+    InputSplitEvents inputSplitEvents) {
     String workerInputSplitsDonePath =
         inputSplitPaths.getDonePath() + "/" +
             getWorkerInfo().getHostnameId();
@@ -420,10 +488,12 @@ public class BspServiceWorker<I extends WritableComparable,
           CreateMode.PERSISTENT,
           true);
     } catch (KeeperException e) {
-      throw new IllegalStateException("waitForOtherWorkers: " +
+      throw new IllegalStateException(
+          "markCurrentWorkerDoneThenWaitForOthers: " +
           "KeeperException creating worker done splits", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException("waitForOtherWorkers: " +
+      throw new IllegalStateException(
+          "markCurrentWorkerDoneThenWaitForOthers: " +
           "InterruptedException creating worker done splits", e);
     }
     while (true) {
@@ -433,10 +503,12 @@ public class BspServiceWorker<I extends WritableComparable,
             getZkExt().exists(inputSplitPaths.getAllDonePath(),
                 true);
       } catch (KeeperException e) {
-        throw new IllegalStateException("waitForOtherWorkers: " +
+        throw new IllegalStateException(
+            "markCurrentWorkerDoneThenWaitForOthers: " +
             "KeeperException waiting on worker done splits", e);
       } catch (InterruptedException e) {
-        throw new IllegalStateException("waitForOtherWorkers: " +
+        throw new IllegalStateException(
+            "markCurrentWorkerDoneThenWaitForOthers: " +
             "InterruptedException waiting on worker done splits", e);
       }
       if (inputSplitsDoneStat != null) {
@@ -501,6 +573,37 @@ public class BspServiceWorker<I extends WritableComparable,
     aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
 
     VertexEdgeCount vertexEdgeCount;
+    int entriesLoaded = 0;
+
+    if (getConfiguration().hasMappingInputFormat()) {
+      // Ensure the mapping InputSplits are ready for processing
+      ensureInputSplitsReady(mappingInputSplitsPaths, mappingInputSplitsEvents);
+      getContext().progress();
+      try {
+        entriesLoaded = loadMapping();
+        // successfully loaded mapping
+        // now initialize graphPartitionerFactory with this data
+        getGraphPartitionerFactory().initialize(localData);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: loadMapping failed with InterruptedException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: loadMapping failed with KeeperException", e);
+      }
+      getContext().progress();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("setup: Finally loaded a total of " +
+            entriesLoaded + " entries from inputSplits");
+      }
+
+      // Workers wait for each other to finish, coordinated by master
+      markCurrentWorkerDoneThenWaitForOthers(mappingInputSplitsPaths,
+          mappingInputSplitsEvents);
+      // Print stats for data stored in localData once mapping is fully
+      // loaded on all the workers
+      localData.printStats();
+    }
 
     if (getConfiguration().hasVertexInputFormat()) {
       // Ensure the vertex InputSplits are ready for processing
@@ -544,12 +647,14 @@ public class BspServiceWorker<I extends WritableComparable,
 
     if (getConfiguration().hasVertexInputFormat()) {
       // Workers wait for each other to finish, coordinated by master
-      waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
+      markCurrentWorkerDoneThenWaitForOthers(vertexInputSplitsPaths,
+          vertexInputSplitsEvents);
     }
 
     if (getConfiguration().hasEdgeInputFormat()) {
       // Workers wait for each other to finish, coordinated by master
-      waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
+      markCurrentWorkerDoneThenWaitForOthers(edgeInputSplitsPaths,
+          edgeInputSplitsEvents);
     }
 
     // Create remaining partitions owned by this worker.
@@ -569,6 +674,8 @@ public class BspServiceWorker<I extends WritableComparable,
       getServerData().getEdgeStore().moveEdgesToVertices();
     }
 
+    localData.removeMappingStoreIfPossible();
+
     // Generate the partition stats for the input superstep and process
     // if necessary
     List<PartitionStats> partitionStatsList =
@@ -726,6 +833,17 @@ public class BspServiceWorker<I extends WritableComparable,
         getGraphTaskManager().getGraphFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("startSuperstep: addressesAndPartitions" +
+          addressesAndPartitions.getWorkerInfos());
+      for (PartitionOwner partitionOwner : addressesAndPartitions
+          .getPartitionOwners()) {
+        LOG.debug(partitionOwner.getPartitionId() + " " +
+            partitionOwner.getWorkerInfo());
+      }
+    }
+
     return addressesAndPartitions.getPartitionOwners();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 828eac4..35ad94b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -48,6 +48,7 @@ import java.io.IOException;
  * @param <V> Vertex value
  * @param <E> Edge value
  */
+@SuppressWarnings("unchecked")
 public class EdgeInputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends InputSplitsCallable<I, V, E> {
@@ -62,10 +63,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
 
   /** Aggregator handler */
   private final WorkerThreadAggregatorUsage aggregatorUsage;
+  /** Bsp service worker (only use thread-safe methods) */
+  private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Edge input format */
   private final EdgeInputFormat<I, E> edgeInputFormat;
   /** Input split max edges (-1 denotes all) */
   private final long inputSplitMaxEdges;
+  /** Can embedInfo in vertexIds */
+  private final boolean canEmbedInIds;
 
   /** Filter to use */
   private final EdgeInputFilter<I, E> edgeInputFilter;
@@ -97,11 +102,19 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
         zooKeeperExt);
     this.edgeInputFormat = edgeInputFormat;
 
+    this.bspServiceWorker = bspServiceWorker;
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
     // Initialize aggregator usage.
     this.aggregatorUsage = bspServiceWorker.getAggregatorHandler()
       .newThreadAggregatorUsage();
     edgeInputFilter = configuration.getEdgeInputFilter();
+    canEmbedInIds = bspServiceWorker
+        .getLocalData()
+        .getMappingStoreOps() != null &&
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .hasEmbedding();
 
     // Initialize Metrics
     totalEdgesMeter = getTotalEdgesLoadedMeter();
@@ -157,6 +170,16 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
             "readInputSplit: Edge reader returned an edge " +
                 "without a value!  - " + readerEdge);
       }
+      if (canEmbedInIds) {
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .embedTargetInfo(sourceId);
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .embedTargetInfo(readerEdge.getTargetVertexId());
+      }
 
       ++inputSplitEdgesLoaded;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
new file mode 100644
index 0000000..4e93ce0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
+import org.apache.giraph.time.Times;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * InputSplitCallable to read all the splits
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ */
+public abstract class FullInputSplitCallable<I extends WritableComparable,
+  V extends Writable, E extends Writable>
+  implements Callable<Integer> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(
+    FullInputSplitCallable.class);
+  /** Class time object */
+  private static final Time TIME = SystemTime.get();
+  /** Configuration */
+  protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+  /** Context */
+  protected final Mapper<?, ?, ?, ?>.Context context;
+
+  /** The List of InputSplit znode paths */
+  private final List<String> pathList;
+  /** Current position in the path list */
+  private final AtomicInteger currentIndex;
+  /** ZooKeeperExt handle */
+  private final ZooKeeperExt zooKeeperExt;
+  /** Get the start time in nanos */
+  private final long startNanos = TIME.getNanoseconds();
+
+  // CHECKSTYLE: stop ParameterNumberCheck
+  /**
+   * Constructor.
+
+   * @param splitOrganizer Input splits organizer
+   * @param context Context
+   * @param configuration Configuration
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   * @param currentIndex Atomic Integer to get splitPath from list
+   */
+  public FullInputSplitCallable(InputSplitPathOrganizer splitOrganizer,
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      ZooKeeperExt zooKeeperExt,
+      AtomicInteger currentIndex) {
+    this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
+    this.currentIndex = currentIndex;
+    this.zooKeeperExt = zooKeeperExt;
+    this.context = context;
+    this.configuration = configuration;
+  }
+  // CHECKSTYLE: resume ParameterNumberCheck
+
+  /**
+   * Get input format
+   *
+   * @return Input format
+   */
+  public abstract GiraphInputFormat getInputFormat();
+
+  /**
+   * Load mapping entries from all the given input splits
+   *
+   * @param inputSplit Input split to load
+   * @return Count of vertices and edges loaded
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  protected abstract Integer readInputSplit(InputSplit inputSplit)
+    throws IOException, InterruptedException;
+
+  @Override
+  public Integer call() {
+    int entries = 0;
+    String inputSplitPath;
+    int inputSplitsProcessed = 0;
+    try {
+      while (true) {
+        int pos = currentIndex.getAndIncrement();
+        if (pos >= pathList.size()) {
+          break;
+        }
+        inputSplitPath = pathList.get(pos);
+        entries += loadInputSplit(inputSplitPath);
+        context.progress();
+        ++inputSplitsProcessed;
+      }
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("call: InterruptedException", e);
+    } catch (IOException e) {
+      throw new IllegalStateException("call: IOException", e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("call: ClassNotFoundException", e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException("call: InstantiationException", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException("call: IllegalAccessException", e);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      float seconds = Times.getNanosSince(TIME, startNanos) /
+          Time.NS_PER_SECOND_AS_FLOAT;
+      float entriesPerSecond = entries / seconds;
+      LOG.info("call: Loaded " + inputSplitsProcessed + " " +
+          "input splits in " + seconds + " secs, " + entries +
+          " " + entriesPerSecond + " entries/sec");
+    }
+    return entries;
+  }
+
+  /**
+   * Extract entries from input split, saving them into mapping store.
+   * Mark the input split finished when done.
+   *
+   * @param inputSplitPath ZK location of input split
+   * @return Number of entries read in this input split
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws InstantiationException
+   * @throws IllegalAccessException
+   */
+  private Integer loadInputSplit(
+    String inputSplitPath)
+    throws IOException, ClassNotFoundException, InterruptedException,
+    InstantiationException, IllegalAccessException {
+    InputSplit inputSplit = getInputSplit(inputSplitPath);
+    Integer entriesRead = readInputSplit(inputSplit);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadFromInputSplit: Finished loading " +
+          inputSplitPath + " " + entriesRead);
+    }
+    return entriesRead;
+  }
+
+  /**
+   * Talk to ZooKeeper to convert the input split path to the actual
+   * InputSplit.
+   *
+   * @param inputSplitPath Location in ZK of input split
+   * @return instance of InputSplit
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  protected InputSplit getInputSplit(String inputSplitPath)
+    throws IOException, ClassNotFoundException {
+    byte[] splitList;
+    try {
+      splitList = zooKeeperExt.getData(inputSplitPath, false, null);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "getInputSplit: KeeperException on " + inputSplitPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getInputSplit: IllegalStateException on " + inputSplitPath, e);
+    }
+    context.progress();
+
+    DataInputStream inputStream =
+        new DataInputStream(new ByteArrayInputStream(splitList));
+    InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getInputSplit: Processing " + inputSplitPath +
+          " from ZooKeeper and got input split '" +
+          inputSplit.toString() + "'");
+    }
+    return inputSplit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java b/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java
new file mode 100644
index 0000000..9612344
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingStoreOps;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Stores LocalData for each worker
+ *
+ * @param <I> vertexId type
+ * @param <V> vertex value type
+ * @param <E> edge value type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public class LocalData<I extends WritableComparable, V extends Writable,
+  E extends Writable, B extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+  /** Logger Instance */
+  private static final Logger LOG = Logger.getLogger(LocalData.class);
+  /** MappingStore from vertexId - target */
+  private MappingStore<I, B> mappingStore;
+  /** Do operations using mapping store */
+  private MappingStoreOps<I, B> mappingStoreOps;
+  /**
+   * Constructor
+   *
+   * Set configuration, create & initialize mapping store
+   * @param conf giraph configuration
+   */
+  public LocalData(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    // set configuration
+    setConf(conf);
+    // check if user set the mapping store => create & initialize it
+    mappingStore = (MappingStore<I, B>) getConf().createMappingStore();
+    if (mappingStore != null) {
+      mappingStore.initialize();
+    }
+    mappingStoreOps = (MappingStoreOps<I, B>) getConf().createMappingStoreOps();
+    if (mappingStoreOps != null) {
+      mappingStoreOps.initialize(mappingStore);
+    }
+  }
+
+  public MappingStore<I, B> getMappingStore() {
+    return mappingStore;
+  }
+
+  public MappingStoreOps<I, B> getMappingStoreOps() {
+    return mappingStoreOps;
+  }
+
+  /**
+   * Remove mapping store from localData
+   * if mapping data is already embedded into vertexIds
+   */
+  public void removeMappingStoreIfPossible() {
+    if (mappingStoreOps != null && mappingStoreOps.hasEmbedding()) {
+      mappingStore = null;
+    }
+  }
+
+  /**
+   * Prints Stats of individual data it stores
+   */
+  public void printStats() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("MappingStore has : " + mappingStore.getStats() + " entries");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
new file mode 100644
index 0000000..a2279a9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Load as many mapping input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public class MappingInputSplitsCallable<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  extends FullInputSplitCallable<I, V, E> {
+  /** User supplied mappingInputFormat */
+  private final MappingInputFormat<I, V, E, B> mappingInputFormat;
+  /** Link to bspServiceWorker */
+  private final BspServiceWorker<I, V, E> bspServiceWorker;
+
+  /**
+   * Constructor
+   *
+   * @param mappingInputFormat mappingInputFormat
+   * @param splitOrganizer Input splits organizer
+   * @param context Context
+   * @param configuration Configuration
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   * @param currentIndex Atomic Integer to get splitPath from list
+   * @param bspServiceWorker bsp service worker
+   */
+  public MappingInputSplitsCallable(
+      MappingInputFormat<I, V, E, B> mappingInputFormat,
+      InputSplitPathOrganizer splitOrganizer,
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      ZooKeeperExt zooKeeperExt,
+      AtomicInteger currentIndex,
+      BspServiceWorker<I, V, E> bspServiceWorker) {
+    super(splitOrganizer, context,
+      configuration, zooKeeperExt, currentIndex);
+    this.mappingInputFormat = mappingInputFormat;
+    this.bspServiceWorker = bspServiceWorker;
+  }
+
+  @Override
+  public GiraphInputFormat getInputFormat() {
+    return mappingInputFormat;
+  }
+
+  @Override
+  protected Integer readInputSplit(InputSplit inputSplit)
+    throws IOException, InterruptedException {
+    MappingReader<I, V, E, B> mappingReader =
+        mappingInputFormat.createMappingReader(inputSplit, context);
+    mappingReader.setConf(configuration);
+
+    WorkerThreadAggregatorUsage aggregatorUsage = this.bspServiceWorker
+        .getAggregatorHandler().newThreadAggregatorUsage();
+
+    mappingReader.initialize(inputSplit, context);
+    mappingReader.setWorkerAggregatorUse(aggregatorUsage);
+
+    int entriesLoaded = 0;
+    MappingStore<I, B> mappingStore =
+      (MappingStore<I, B>) bspServiceWorker.getLocalData().getMappingStore();
+
+    while (mappingReader.nextEntry()) {
+      MappingEntry<I, B> entry = mappingReader.getCurrentEntry();
+      entriesLoaded += 1;
+      mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
+    }
+    return entriesLoaded;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
new file mode 100644
index 0000000..21a981e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Factory for {@link org.apache.giraph.worker.MappingInputSplitsCallable}s.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class MappingInputSplitsCallableFactory<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  implements CallableFactory<Integer> {
+  /** Mapping input format */
+  private final MappingInputFormat<I, V, E, B> mappingInputFormat;
+  /** Input split organizer */
+  private final InputSplitPathOrganizer splitOrganizer;
+  /** Mapper context. */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+  /** {@link BspServiceWorker} we're running on. */
+  private final BspServiceWorker<I, V, E> bspServiceWorker;
+  /** {@link ZooKeeperExt} for this worker. */
+  private final ZooKeeperExt zooKeeperExt;
+  /** Current position in the path list */
+  private final AtomicInteger currentIndex;
+
+
+  /**
+   * Constructor.
+   *
+   * @param mappingInputFormat Mapping input format
+   * @param splitOrganizer Input split organizer
+   * @param context Mapper context
+   * @param configuration Configuration
+   * @param bspServiceWorker Calling {@link BspServiceWorker}
+   * @param zooKeeperExt {@link org.apache.giraph.zk.ZooKeeperExt}
+   *                     for this worker
+   */
+  public MappingInputSplitsCallableFactory(
+      MappingInputFormat<I, V, E, B> mappingInputFormat,
+      InputSplitPathOrganizer splitOrganizer,
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      BspServiceWorker<I, V, E> bspServiceWorker,
+      ZooKeeperExt zooKeeperExt) {
+    this.mappingInputFormat = mappingInputFormat;
+    this.splitOrganizer = splitOrganizer;
+    this.context = context;
+    this.configuration = configuration;
+    this.bspServiceWorker = bspServiceWorker;
+    this.zooKeeperExt = zooKeeperExt;
+    this.currentIndex = new AtomicInteger(0);
+  }
+
+  @Override
+  public FullInputSplitCallable<I, V, E> newCallable(int threadId) {
+    return new MappingInputSplitsCallable<>(
+        mappingInputFormat,
+        splitOrganizer,
+        context,
+        configuration,
+        zooKeeperExt,
+        currentIndex,
+        bspServiceWorker);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index e3e04d6..4c85765 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -19,12 +19,15 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.filters.VertexInputFilter;
+import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
@@ -50,6 +53,7 @@ import java.io.IOException;
  * @param <V> Vertex value
  * @param <E> Edge value
  */
+@SuppressWarnings("unchecked")
 public class VertexInputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends InputSplitsCallable<I, V, E> {
@@ -69,6 +73,15 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
   private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Filter to select which vertices to keep */
   private final VertexInputFilter<I, V, E> vertexInputFilter;
+  /** Can embedInfo in vertexIds */
+  private final boolean canEmbedInIds;
+  /**
+   * Whether the chosen {@link OutEdges} implementation allows for Edge
+   * reuse.
+   */
+  private boolean reuseEdgeObjects;
+  /** Used to translate Edges during vertex input phase based on localData */
+  private final TranslateEdge<I, E> translateEdge;
 
   // Metrics
   /** number of vertices loaded meter across all readers */
@@ -102,6 +115,15 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
     this.bspServiceWorker = bspServiceWorker;
     vertexInputFilter = configuration.getVertexInputFilter();
+    reuseEdgeObjects = configuration.reuseEdgeObjects();
+    canEmbedInIds = bspServiceWorker
+        .getLocalData()
+        .getMappingStoreOps() != null &&
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .hasEmbedding();
+    translateEdge = bspServiceWorker.getTranslateEdge();
 
     // Initialize Metrics
     totalVerticesMeter = getTotalVerticesLoadedMeter();
@@ -151,6 +173,12 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
             "readInputSplit: Vertex reader returned a vertex " +
                 "without an id!  - " + readerVertex);
       }
+      if (canEmbedInIds) {
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .embedTargetInfo(readerVertex.getId());
+      }
       if (readerVertex.getValue() == null) {
         readerVertex.setValue(configuration.createVertexValue());
       }
@@ -167,6 +195,37 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
         continue;
       }
 
+      // Before saving to partition-store translate all edges (if present)
+      if (translateEdge != null) {
+        // only iff vertexInput reads edges also
+        if (readerVertex.getEdges() != null && readerVertex.getNumEdges() > 0) {
+          OutEdges<I, E> vertexOutEdges = configuration
+              .createAndInitializeOutEdges(readerVertex.getNumEdges());
+          // TODO : this works for generic OutEdges, can create a better api
+          // to support more efficient translation for specific types
+
+          // NOTE : for implementations where edge is reusable, space is
+          // consumed by the OutEdges data structure itself, but if not reusable
+          // space is consumed by the newly created edge -> and the new OutEdges
+          // data structure just holds a reference to the newly created edge
+          // so in any way we virtually hold edges twice - similar to
+          // OutEdges.trim() -> this has the same complexity as OutEdges.trim()
+          for (Edge<I, E> edge : readerVertex.getEdges()) {
+            if (reuseEdgeObjects) {
+              bspServiceWorker
+                  .getLocalData()
+                  .getMappingStoreOps()
+                  .embedTargetInfo(edge.getTargetVertexId());
+              vertexOutEdges.add(edge); // edge can be re-used
+            } else { // edge objects cannot be reused - so create new edges
+              vertexOutEdges.add(configuration.createEdge(translateEdge, edge));
+            }
+          }
+          // set out edges to translated instance -> old instance is released
+          readerVertex.setEdges(vertexOutEdges);
+        }
+      }
+
       PartitionOwner partitionOwner =
           bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
       workerClientRequestProcessor.sendVertexRequest(

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
index 4e19cd2..96bd5d7 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.partition;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 
@@ -47,6 +48,7 @@ public class SimpleRangePartitionFactoryTest {
     ArrayList<WorkerInfo> infos = new ArrayList<WorkerInfo>();
     for (int i = 0; i < numWorkers; i++) {
       WorkerInfo info = new WorkerInfo();
+      info.setInetSocketAddress(new InetSocketAddress(8080));
       info.setTaskId(i);
       infos.add(info);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 603910b..fbc24f8 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -31,6 +31,8 @@ import org.apache.giraph.graph.Computation;
 import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.mapping.HiveMappingInputFormat;
+import org.apache.giraph.hive.input.mapping.HiveToMapping;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
 import org.apache.giraph.hive.output.HiveVertexOutputFormat;
@@ -55,6 +57,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_MAPPING_INPUT;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
@@ -87,6 +90,8 @@ public class HiveGiraphRunner implements Tool {
   private List<EdgeInputFormatDescription> edgeInputDescriptions =
       Lists.newArrayList();
 
+  /** Hive Mapping reader */
+  private Class<? extends HiveToMapping> hiveToMappingClass;
   /** Hive Vertex writer */
   private Class<? extends VertexToHive> vertexToHiveClass;
   /** Skip output? (Useful for testing without writing) */
@@ -238,6 +243,36 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
+   * Check if mapping input is set
+   *
+   * @return true if mapping input is set
+   */
+  public boolean hasMappingInput() {
+    return hiveToMappingClass != null;
+  }
+
+  /**
+   * Set mapping input
+   *
+   * @param hiveToMappingClass class for reading mapping entries from Hive.
+   * @param tableName Table name
+   * @param partitionFilter Partition filter, or null if no filter used
+   */
+  public void setMappingInput(
+      Class<? extends HiveToMapping> hiveToMappingClass, String tableName,
+      String partitionFilter) {
+    this.hiveToMappingClass = hiveToMappingClass;
+    conf.set(HIVE_MAPPING_INPUT.getClassOpt().getKey(),
+        hiveToMappingClass.getName());
+    conf.set(HIVE_MAPPING_INPUT.getProfileIdOpt().getKey(),
+        "mapping_input_profile");
+    conf.set(HIVE_MAPPING_INPUT.getTableOpt().getKey(), tableName);
+    if (partitionFilter != null) {
+      conf.set(HIVE_MAPPING_INPUT.getPartitionOpt().getKey(), partitionFilter);
+    }
+  }
+
+  /**
    * main method
    * @param args system arguments
    * @throws Exception any errors from Hive Giraph Runner
@@ -425,6 +460,22 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
+   * Prepare input settings in Configuration
+   *
+   * This caches metadata information into the configuration to eliminate worker
+   * access to the metastore.
+   */
+  public void prepareHiveMappingInput() {
+    GiraphConstants.MAPPING_INPUT_FORMAT_CLASS.set(conf,
+        HiveMappingInputFormat.class);
+
+    Configuration confCopy = new Configuration(conf);
+    createGiraphConf(confCopy)
+        .createWrappedMappingInputFormat()
+        .checkInputSpecs(confCopy);
+  }
+
+  /**
    * process arguments
    * @param args to process
    * @return CommandLine instance
@@ -458,6 +509,17 @@ public class HiveGiraphRunner implements Tool {
               " class name (-computationClass) to use");
     }
 
+    String mappingInput = cmdln.getOptionValue("mappingInput");
+    if (mappingInput != null) {
+      String[] parameters = split(mappingInput, ",", 3);
+      if (parameters.length < 2) {
+        throw new IllegalStateException("Illegal mappingInput description " +
+            mappingInput + " - HiveToMapping class and table name needed");
+      }
+      setMappingInput(findClass(parameters[0], HiveToMapping.class),
+          parameters[1], elementOrNull(parameters, 2));
+    }
+
     String[] vertexInputs = cmdln.getOptionValues("vertexInput");
     if (vertexInputs != null && vertexInputs.length != 0) {
       vertexInputDescriptions.clear();
@@ -534,6 +596,11 @@ public class HiveGiraphRunner implements Tool {
     // allow metastore changes (i.e. creating tables that don't exist)
     processMoreArguments(cmdln);
 
+    if (mappingInput != null) { // mapping input is provided
+      HIVE_MAPPING_INPUT.getDatabaseOpt().set(conf, dbName);
+      prepareHiveMappingInput();
+    }
+
     if (hasVertexInput()) {
       HIVE_VERTEX_INPUT.getDatabaseOpt().set(conf, dbName);
       prepareHiveVertexInputs();
@@ -573,6 +640,12 @@ public class HiveGiraphRunner implements Tool {
 
     options.addOption("db", "dbName", true, "Hive database name");
 
+    // Mapping input settings
+    options.addOption("mi", "mappingInput", true, "Giraph " +
+        HiveToMapping.class.getSimpleName() + " class to use, table name and " +
+        "partition filter (optional). Example:\n" +
+        "\"MyHiveToMapping, myTableName, a=1,b=two");
+
     // Vertex input settings
     options.addOption("vi", "vertexInput", true, getInputOptionDescription(
         "vertex", HiveToVertex.class.getSimpleName()));

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
index c7ad63b..ab533a2 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
@@ -20,6 +20,7 @@ package org.apache.giraph.hive.common;
 
 import org.apache.giraph.conf.ClassConfOption;
 import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.hive.input.mapping.HiveToMapping;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.output.VertexToHive;
@@ -28,6 +29,9 @@ import org.apache.giraph.hive.output.VertexToHive;
  * Constants for giraph-hive
  */
 public class GiraphHiveConstants {
+  /** Options for configuring mapping input */
+  public static final HiveInputOptions<HiveToMapping> HIVE_MAPPING_INPUT =
+      new HiveInputOptions<>("mapping", HiveToMapping.class);
   /** Options for configuring vertex input */
   public static final HiveInputOptions<HiveToVertex> HIVE_VERTEX_INPUT =
       new HiveInputOptions<HiveToVertex>("vertex", HiveToVertex.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
index 2388673..35d8b3e 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
@@ -20,6 +20,7 @@ package org.apache.giraph.hive.common;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.hive.input.mapping.HiveToMapping;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.output.VertexToHive;
@@ -46,12 +47,14 @@ import java.util.Map;
 
 import static java.lang.System.getenv;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_MAPPING_INPUT;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
 
 /**
  * Utility methods for Hive IO
  */
+@SuppressWarnings("unchecked")
 public class HiveUtils {
   /** Logger */
   private static final Logger LOG = Logger.getLogger(HiveUtils.class);
@@ -342,6 +345,31 @@ public class HiveUtils {
   }
 
   /**
+   * Create a new HiveToMapping
+   *
+   * @param conf ImmutableClassesGiraphConfiguration
+   * @param schema HiveTableSchema
+   * @param <I> vertexId type
+   * @param <V> vertexValue type
+   * @param <E> edgeValue type
+   * @param <B> mappingTarget type
+   * @return HiveToMapping
+   */
+  public static <I extends WritableComparable, V extends Writable,
+    E extends Writable, B extends Writable>
+  HiveToMapping<I, B> newHiveToMapping(
+    ImmutableClassesGiraphConfiguration<I, V, E> conf,
+    HiveTableSchema schema) {
+    Class<? extends HiveToMapping> klass = HIVE_MAPPING_INPUT.getClass(conf);
+    if (klass == null) {
+      throw new IllegalArgumentException(
+          HIVE_MAPPING_INPUT.getClassOpt().getKey() + " not set in conf"
+      );
+    }
+    return newInstance(klass, conf, schema);
+  }
+
+  /**
    * Create a new instance of a class, configuring it and setting the Hive table
    * schema if it supports those types.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java
new file mode 100644
index 0000000..dc7a6ee
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * AbstractHiveToMapping
+ *
+ * @param <I> vertexId type parameter
+ * @param <B> mapping target type parameter
+ */
+public abstract class AbstractHiveToMapping<I extends WritableComparable,
+    B extends Writable>
+    extends DefaultConfigurableAndTableSchemaAware<I, Writable, Writable>
+    implements HiveToMapping<I, B> {
+  @Override
+  public final void remove() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java
new file mode 100644
index 0000000..973813d
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.input.HiveApiInputFormat;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.common.GiraphHiveConstants;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.io.iterables.MappingReaderWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.giraph.hive.common.HiveUtils.newHiveToMapping;
+
+/**
+ * HiveMappingInputFormat extends MappingInputFormat
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class HiveMappingInputFormat<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  extends MappingInputFormat<I, V, E, B> {
+  /** Underlying Hive InputFormat used */
+  private final HiveApiInputFormat hiveInputFormat;
+
+  /**
+   * Create vertex input format
+   */
+  public HiveMappingInputFormat() {
+    hiveInputFormat = new HiveApiInputFormat();
+  }
+
+  @Override
+  public void checkInputSpecs(Configuration conf) {
+    HiveInputDescription inputDesc =
+        GiraphHiveConstants.HIVE_MAPPING_INPUT.makeInputDescription(conf);
+    HiveTableSchema schema = getTableSchema();
+    HiveToMapping<I, B> hiveToMapping = newHiveToMapping(getConf(), schema);
+    hiveToMapping.checkInput(inputDesc, schema);
+  }
+
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    super.setConf(conf);
+    hiveInputFormat.initialize(
+        GiraphHiveConstants.HIVE_MAPPING_INPUT.makeInputDescription(conf),
+        GiraphHiveConstants.HIVE_MAPPING_INPUT.getProfileID(conf),
+        conf);
+  }
+
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+    throws IOException, InterruptedException {
+    return hiveInputFormat.getSplits(context);
+  }
+
+  @Override
+  public MappingReader<I, V, E, B> createMappingReader(InputSplit split,
+    TaskAttemptContext context) throws IOException {
+    HiveMappingReader<I, B> reader = new HiveMappingReader<>();
+    reader.setTableSchema(getTableSchema());
+
+    RecordReader<WritableComparable, HiveReadableRecord> baseReader;
+    try {
+      baseReader = hiveInputFormat.createRecordReader(split, context);
+    } catch (InterruptedException e) {
+      throw new IOException("Could not create map reader", e);
+    }
+
+    reader.setHiveRecordReader(baseReader);
+    return new MappingReaderWrapper<>(reader);
+  }
+
+
+  /**
+   * Get Hive table schema
+   *
+   * @return Hive table schema
+   */
+  private HiveTableSchema getTableSchema() {
+    return hiveInputFormat.getTableSchema(getConf());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java
new file mode 100644
index 0000000..3154f9d
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.giraph.hive.common.HiveUtils;
+import org.apache.giraph.hive.input.RecordReaderWrapper;
+import org.apache.giraph.io.iterables.GiraphReader;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * MappingReader using Hive
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public class HiveMappingReader<I extends WritableComparable,
+  B extends Writable>
+  extends DefaultConfigurableAndTableSchemaAware<I, Writable, Writable>
+  implements GiraphReader<MappingEntry<I, B>> {
+  /** Underlying Hive RecordReader used */
+  private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader;
+  /** Hive To Mapping */
+  private HiveToMapping<I, B> hiveToMapping;
+
+  /**
+   * Get hiverecord reader
+   *
+   * @return hiveRecordReader
+   */
+  public RecordReader<WritableComparable, HiveReadableRecord>
+  getHiveRecordReader() {
+    return hiveRecordReader;
+  }
+
+  public void setHiveRecordReader(
+      RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) {
+    this.hiveRecordReader = hiveRecordReader;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+    TaskAttemptContext context) throws IOException, InterruptedException {
+    hiveRecordReader.initialize(inputSplit, context);
+    hiveToMapping = HiveUtils.newHiveToMapping(getConf(), getTableSchema());
+    hiveToMapping.initializeRecords(
+        new RecordReaderWrapper<>(hiveRecordReader));
+  }
+
+  @Override
+  public void close() throws IOException {
+    hiveRecordReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return hiveRecordReader.getProgress();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return hiveToMapping.hasNext();
+  }
+
+
+  @Override
+  public MappingEntry<I, B> next() {
+    return hiveToMapping.next();
+  }
+
+  @Override
+  public void remove() {
+    hiveToMapping.remove();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java
new file mode 100644
index 0000000..497b044
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import org.apache.giraph.hive.input.HiveInputChecker;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ *  HiveToMapping interface
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public interface HiveToMapping<I extends WritableComparable,
+    B extends Writable> extends
+    Iterator<MappingEntry<I, B>>, HiveInputChecker {
+  /**
+   * Set the records which contain vertex input data
+   *
+   * @param records Hive records
+   */
+  void initializeRecords(Iterator<HiveReadableRecord> records);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java
new file mode 100644
index 0000000..feccc1f
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ * SimpleHiveToMapping - convenient class for HiveToMapping
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public abstract class SimpleHiveToMapping<I extends WritableComparable,
+  B extends Writable> extends AbstractHiveToMapping<I, B> {
+  /** Hive records which we are reading from */
+  private Iterator<HiveReadableRecord> records;
+
+  /** Reusable entry object */
+  private  MappingEntry<I, B> reusableEntry;
+
+  /** Reusable vertex id */
+  private I reusableVertexId;
+  /** Reusable mapping target */
+  private B reusableMappingTarget;
+
+  /**
+   * Read vertexId from hive record
+   *
+   * @param record HiveReadableRecord
+   * @return vertexId
+   */
+  public abstract I getVertexId(HiveReadableRecord record);
+
+  /**
+   * Read mappingTarget from hive record
+   *
+   * @param record HiveReadableRecord
+   * @return mappingTarget
+   */
+  public abstract B getMappingTarget(HiveReadableRecord record);
+
+  @Override
+  public void initializeRecords(Iterator<HiveReadableRecord> records) {
+    this.records = records;
+    reusableVertexId = getConf().createVertexId();
+    reusableMappingTarget = (B) getConf().createMappingTarget();
+    reusableEntry = new MappingEntry<>(reusableVertexId,
+        reusableMappingTarget);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return records.hasNext();
+  }
+
+  @Override
+  public MappingEntry<I, B> next() {
+    HiveReadableRecord record = records.next();
+    I id = getVertexId(record);
+    B target = getMappingTarget(record);
+    reusableEntry.setVertexId(id);
+    reusableEntry.setMappingTarget(target);
+    return reusableEntry;
+  }
+
+  /**
+   * Returns reusableVertexId for use in other methods
+   *
+   * @return reusableVertexId
+   */
+  public I getReusableVertexId() {
+    return reusableVertexId;
+  }
+
+  /**
+   * Returns reusableMappingTarget for use in other methods
+   *
+   * @return reusableMappingTarget
+   */
+  public B getReusableMappingTarget() {
+    return reusableMappingTarget;
+  }
+}