You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/04 00:28:27 UTC

[2/2] git commit: updated refs/heads/trunk to c5a87d1

GIRAPH-639: Add support for multiple Vertex/Edge inputs (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c5a87d16
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c5a87d16
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c5a87d16

Branch: refs/heads/trunk
Commit: c5a87d16119c7e9a9f2af80a5534c4fe37605dd3
Parents: ff970f2
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Fri May 3 15:14:35 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Fri May 3 15:14:35 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../apache/giraph/conf/GiraphConfiguration.java    |   10 +-
 .../conf/ImmutableClassesGiraphConfiguration.java  |   14 +
 .../java/org/apache/giraph/io/EdgeInputFormat.java |    6 +-
 .../org/apache/giraph/io/GiraphInputFormat.java    |   47 ++-
 .../org/apache/giraph/io/VertexInputFormat.java    |    8 +-
 .../formats/multi/EdgeInputFormatDescription.java  |  145 +++++
 .../io/formats/multi/InputFormatDescription.java   |  167 ++++++
 .../multi/InputSplitWithInputFormatIndex.java      |   67 +++
 .../io/formats/multi/MultiEdgeInputFormat.java     |   93 ++++
 .../giraph/io/formats/multi/MultiInputUtils.java   |  103 ++++
 .../io/formats/multi/MultiVertexInputFormat.java   |   95 ++++
 .../multi/VertexInputFormatDescription.java        |  151 +++++
 .../giraph/io/formats/multi/package-info.java      |   22 +
 .../giraph/io/internal/WrappedEdgeInputFormat.java |   15 +
 .../io/internal/WrappedVertexInputFormat.java      |   15 +
 .../org/apache/giraph/master/BspServiceMaster.java |   14 +-
 .../org/apache/giraph/worker/BspServiceWorker.java |    2 +
 .../giraph/worker/EdgeInputSplitsCallable.java     |   12 +-
 .../worker/EdgeInputSplitsCallableFactory.java     |    7 +
 .../apache/giraph/worker/InputSplitsCallable.java  |   16 +-
 .../giraph/worker/VertexInputSplitsCallable.java   |   13 +-
 .../worker/VertexInputSplitsCallableFactory.java   |    7 +
 .../org/apache/giraph/hive/HiveGiraphRunner.java   |  427 +++++++++-----
 24 files changed, 1271 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7c906d2..4d2c90c 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.0.1 - unreleased
+  GIRAPH-639: Add support for multiple Vertex/Edge inputs (majakabiljo)
+
   GIRAPH-653: Hadoop_non_secure broken (majakabiljo)
 
   GIRAPH-650: Exception in GiraphConfiguration initialization (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index deb2069..45a29ff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -185,7 +185,7 @@ public class GiraphConfiguration extends Configuration
    *
    * @param vertexInputFormatClass Determines how graph is input
    */
-  public final void setVertexInputFormatClass(
+  public void setVertexInputFormatClass(
       Class<? extends VertexInputFormat> vertexInputFormatClass) {
     VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
   }
@@ -204,7 +204,7 @@ public class GiraphConfiguration extends Configuration
    *
    * @param edgeInputFormatClass Determines how graph is input
    */
-  public final void setEdgeInputFormatClass(
+  public void setEdgeInputFormatClass(
       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
     EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
   }
@@ -955,8 +955,10 @@ public class GiraphConfiguration extends Configuration
    * @param conf Configuration
    */
   public void updateConfiguration(Configuration conf) {
-    for (Map.Entry<String, String> parameter : giraphSetParameters) {
-      conf.set(parameter.getKey(), parameter.getValue());
+    if (this != conf) {
+      for (Map.Entry<String, String> parameter : giraphSetParameters) {
+        conf.set(parameter.getKey(), parameter.getValue());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index f5a926f..f992b37 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -203,6 +203,13 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   @Override
+  public void setVertexInputFormatClass(
+      Class<? extends VertexInputFormat> vertexInputFormatClass) {
+    super.setVertexInputFormatClass(vertexInputFormatClass);
+    classes.setVertexInputFormatClass(vertexInputFormatClass);
+  }
+
+  @Override
   public boolean hasVertexOutputFormat() {
     return classes.hasVertexOutputFormat();
   }
@@ -305,6 +312,13 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     return wrappedEdgeInputFormat;
   }
 
+  @Override
+  public void setEdgeInputFormatClass(
+      Class<? extends EdgeInputFormat> edgeInputFormatClass) {
+    super.setEdgeInputFormatClass(edgeInputFormatClass);
+    classes.setEdgeInputFormatClass(edgeInputFormatClass);
+  }
+
   /**
    * Get the user's subclassed {@link AggregatorWriter}.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
index 2aac1f0..1f4451f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,10 +40,7 @@ import java.io.IOException;
  * @param <E> Edge data
  */
 public abstract class EdgeInputFormat<I extends WritableComparable,
-    E extends Writable>
-    extends
-    DefaultImmutableClassesGiraphConfigurable<I, Writable, E, Writable>
-    implements GiraphInputFormat {
+    E extends Writable> extends GiraphInputFormat<I, Writable, E> {
   /**
    * Create an edge reader for a given split. The framework will call
    * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
index 13efc93..58d79a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
@@ -18,15 +18,29 @@
 
 package org.apache.giraph.io;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Common interface for {@link VertexInputFormat} and {@link EdgeInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
  */
-public interface GiraphInputFormat {
+public abstract class GiraphInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends
+    DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable> {
   /**
    * Get the list of input splits for the format.
    *
@@ -36,6 +50,33 @@ public interface GiraphInputFormat {
    * @throws IOException
    * @throws InterruptedException
    */
-  List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
-    throws IOException, InterruptedException;
+  public abstract List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint) throws IOException, InterruptedException;
+
+  /**
+   * Write input split info to DataOutput.
+   *
+   * @param inputSplit InputSplit
+   * @param dataOutput DataOutput
+   */
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    Text.writeString(dataOutput, inputSplit.getClass().getName());
+    ((Writable) inputSplit).write(dataOutput);
+  }
+
+  /**
+   * Read input split info from DataInput.
+   *
+   * @param dataInput DataInput
+   * @return InputSplit
+   */
+  public InputSplit readInputSplit(DataInput dataInput) throws IOException,
+      ClassNotFoundException {
+    String inputSplitClass = Text.readString(dataInput);
+    InputSplit inputSplit = (InputSplit) ReflectionUtils.newInstance(
+            getConf().getClassByName(inputSplitClass), getConf());
+    ((Writable) inputSplit).readFields(dataInput);
+    return inputSplit;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
index c4d7fe2..6c8adc2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
@@ -20,7 +20,6 @@ package org.apache.giraph.io;
 
 import java.io.IOException;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -45,20 +44,17 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  */
 @SuppressWarnings("rawtypes")
 public abstract class VertexInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable>
-    implements GiraphInputFormat {
+    V extends Writable, E extends Writable> extends GiraphInputFormat<I, V, E> {
   /**
    * Create a vertex reader for a given split. Guaranteed to have been
    * configured with setConf() prior to use.  The framework will also call
-   * {@linkVertexReader#initialize(InputSplit, TaskAttemptContext)} before
+   * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
    * the split is used.
    *
    * @param split the split to be read
    * @param context the information about the task
    * @return a new record reader
    * @throws IOException
-   * @throws InterruptedException
    */
   public abstract VertexReader<I, V, E> createVertexReader(
       InputSplit split,

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
new file mode 100644
index 0000000..569cee9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Description of the edge input format - holds edge input format class and all
+ * parameters specifically set for that edge input format.
+ *
+ * Used only with {@link MultiEdgeInputFormat}
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class EdgeInputFormatDescription<I extends WritableComparable,
+    E extends Writable> extends InputFormatDescription<EdgeInputFormat<I, E>> {
+  /**
+   * EdgeInputFormats description - JSON array containing a JSON array for
+   * each edge input. Edge input JSON arrays contain one or two elements -
+   * first one is the name of edge input class, and second one is JSON object
+   * with all specific parameters for this edge input. For example:
+   * [["EIF1",{"p":"v1"}],["EIF2",{"p":"v2","q":"v"}]]
+   */
+  public static final StrConfOption EDGE_INPUT_FORMAT_DESCRIPTIONS =
+      new StrConfOption("giraph.multiEdgeInput.descriptions", null);
+
+  /**
+   * Constructor with edge input format class
+   *
+   * @param edgeInputFormatClass Edge input format class
+   */
+  public EdgeInputFormatDescription(
+      Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass) {
+    super(edgeInputFormatClass);
+  }
+
+  /**
+   * Constructor with json string describing this input format
+   *
+   * @param description Json string describing this input format
+   */
+  public EdgeInputFormatDescription(String description) {
+    super(description);
+  }
+
+  /**
+   * Create a copy of configuration which additionally has all parameters for
+   * this input format set
+   *
+   * @param conf Configuration which we want to create a copy from
+   * @return Copy of configuration
+   */
+  private ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>
+  createConfigurationCopy(
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+    ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
+        new ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>(conf);
+    confCopy.setEdgeInputFormatClass(getInputFormatClass());
+    putParametersToConfiguration(confCopy);
+    return confCopy;
+  }
+
+  /**
+   * Get descriptions of edge input formats from configuration.
+   *
+   * @param conf Configuration
+   * @param <I>  Vertex id
+   * @param <E>  Edge data
+   * @return List of edge input format descriptions
+   */
+  public static <I extends WritableComparable, E extends Writable>
+  List<EdgeInputFormatDescription<I, E>> getEdgeInputFormatDescriptions(
+      Configuration conf) {
+    String edgeInputFormatDescriptions =
+        EDGE_INPUT_FORMAT_DESCRIPTIONS.get(conf);
+    if (edgeInputFormatDescriptions == null) {
+      return Lists.newArrayList();
+    }
+    try {
+      JSONArray inputFormatsJson = new JSONArray(edgeInputFormatDescriptions);
+      List<EdgeInputFormatDescription<I, E>> descriptions =
+          Lists.newArrayListWithCapacity(inputFormatsJson.length());
+      for (int i = 0; i < inputFormatsJson.length(); i++) {
+        descriptions.add(new EdgeInputFormatDescription<I, E>(
+            inputFormatsJson.getString(i)));
+      }
+      return descriptions;
+    } catch (JSONException e) {
+      throw new IllegalStateException("getEdgeInputFormatDescriptions: " +
+          "JSONException occurred while trying to process " +
+          edgeInputFormatDescriptions, e);
+    }
+  }
+
+  /**
+   * Create all edge input formats
+   *
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <E> Edge data
+   * @return List with all edge input formats
+   */
+  public static <I extends WritableComparable,
+      E extends Writable> List<EdgeInputFormat<I, E>> createEdgeInputFormats(
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+    List<EdgeInputFormatDescription<I, E>> descriptions =
+        getEdgeInputFormatDescriptions(conf);
+    List<EdgeInputFormat<I, E>> edgeInputFormats =
+        Lists.newArrayListWithCapacity(descriptions.size());
+    for (EdgeInputFormatDescription<I, E> description : descriptions) {
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
+          description.createConfigurationCopy(conf);
+      edgeInputFormats.add(confCopy.createWrappedEdgeInputFormat());
+    }
+    return edgeInputFormats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java
new file mode 100644
index 0000000..de34a1f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Description of the input format - holds input format class and all
+ * parameters specifically set for that input format.
+ *
+ * Used only with input formats which wrap several input formats into one
+ * ({@link MultiVertexInputFormat} and {@link MultiEdgeInputFormat})
+ *
+ * @param <IF> Input format type
+ */
+public abstract class InputFormatDescription<IF extends GiraphInputFormat> {
+  /** Input format class */
+  private Class<? extends IF> inputFormatClass;
+  /** Parameters set specifically for this input format */
+  private final Map<String, String> parameters = Maps.newHashMap();
+
+  /**
+   * Constructor with input format class
+   *
+   * @param inputFormatClass Input format class
+   */
+  public InputFormatDescription(Class<? extends IF> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass;
+  }
+
+  /**
+   * Constructor with json string describing this input format
+   *
+   * @param description Json string describing this input format
+   */
+  @SuppressWarnings("unchecked")
+  public InputFormatDescription(String description) {
+    try {
+      JSONArray jsonArray = new JSONArray(description);
+      inputFormatClass =
+          (Class<? extends IF>) Class.forName(jsonArray.getString(0));
+      if (jsonArray.length() > 1) {
+        addParameters(jsonArray.getJSONObject(1));
+      }
+    } catch (JSONException e) {
+      throw new IllegalStateException(
+          "Failed to parse JSON " + description, e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("Couldn't find " +
+          "input format class from description " + description, e);
+    }
+  }
+
+  /**
+   * Add parameter to this input format description
+   *
+   * @param name  Parameter name
+   * @param value Parameter value
+   */
+  public void addParameter(String name, String value) {
+    parameters.put(name, value);
+  }
+
+  /**
+   * Add all parameters from json object
+   *
+   * @param parametersJson Json object to read parameters from
+   */
+  public void addParameters(JSONObject parametersJson) {
+    Iterator<?> keys = parametersJson.keys();
+    while (keys.hasNext()) {
+      String key = (String) keys.next();
+      try {
+        addParameter(key, parametersJson.getString(key));
+      } catch (JSONException e) {
+        throw new IllegalStateException("addParameters: Failed to parse " +
+            parametersJson, e);
+      }
+    }
+  }
+
+  /**
+   * Convert input format description to json array
+   *
+   * @return Json array representing this input format description
+   */
+  public JSONArray toJsonArray() {
+    JSONArray jsonArray = new JSONArray();
+    jsonArray.put(inputFormatClass.getName());
+    JSONObject jsonParameters = new JSONObject();
+    for (Map.Entry<String, String> entry : parameters.entrySet()) {
+      try {
+        jsonParameters.put(entry.getKey(), entry.getValue());
+      } catch (JSONException e) {
+        throw new IllegalStateException("toJsonArray: JSONException occurred " +
+            "while trying to process (" + entry.getKey() + ", " +
+            entry.getValue() + ")", e);
+      }
+    }
+    jsonArray.put(jsonParameters);
+    return jsonArray;
+  }
+
+  public Class<? extends IF> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  public void setInputFormatClass(Class<? extends IF> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass;
+  }
+
+  /**
+   * Put parameters from this input format description to configuration
+   *
+   * @param conf Configuration to put parameters to
+   */
+  public void putParametersToConfiguration(Configuration conf) {
+    for (Map.Entry<String, String> entry : parameters.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return toJsonArray().toString();
+  }
+
+  /**
+   * Create JSON string for the InputFormatDescriptions
+   *
+   * @param descriptions InputFormatDescriptions
+   * @return JSON string describing these InputFormatDescriptions
+   */
+  public static String toJsonString(
+      Iterable<? extends InputFormatDescription> descriptions) {
+    JSONArray jsonArray = new JSONArray();
+    for (InputFormatDescription description : descriptions) {
+      jsonArray.put(description.toJsonArray());
+    }
+    return jsonArray.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java
new file mode 100644
index 0000000..c6bba29
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+
+/**
+ * Helper InputSplit class which holds the information about input format
+ * which owns this input split.
+ *
+ * Used only with input formats which wrap several input formats into one
+ * ({@link MultiVertexInputFormat} and {@link MultiEdgeInputFormat})
+ */
+class InputSplitWithInputFormatIndex extends InputSplit {
+  /** Wrapped input split */
+  private InputSplit split;
+  /** Index of input format which owns the input split */
+  private int inputFormatIndex;
+
+  /**
+   * Constructor
+   *
+   * @param split            Input split
+   * @param inputFormatIndex Index of input format which owns the input split
+   */
+  InputSplitWithInputFormatIndex(InputSplit split, int inputFormatIndex) {
+    this.inputFormatIndex = inputFormatIndex;
+    this.split = split;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return split.getLength();
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return split.getLocations();
+  }
+
+  int getInputFormatIndex() {
+    return inputFormatIndex;
+  }
+
+  InputSplit getSplit() {
+    return split;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
new file mode 100644
index 0000000..113b3bc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Edge input format which wraps several edge input formats.
+ * Provides the way to read data from multiple sources,
+ * using several different input formats.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class MultiEdgeInputFormat<I extends WritableComparable,
+    E extends Writable> extends EdgeInputFormat<I, E> {
+  /** Edge input formats */
+  private List<EdgeInputFormat<I, E>> edgeInputFormats;
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+    super.setConf(conf);
+    edgeInputFormats =
+        EdgeInputFormatDescription.createEdgeInputFormats(getConf());
+    if (edgeInputFormats.isEmpty()) {
+      throw new IllegalStateException("setConf: Using MultiEdgeInputFormat " +
+          "without specifying edge inputs");
+    }
+  }
+
+  @Override
+  public EdgeReader<I, E> createEdgeReader(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException {
+    if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+      InputSplitWithInputFormatIndex split =
+          (InputSplitWithInputFormatIndex) inputSplit;
+      EdgeInputFormat<I, E> edgeInputFormat =
+          edgeInputFormats.get(split.getInputFormatIndex());
+      return edgeInputFormat.createEdgeReader(split.getSplit(), context);
+    } else {
+      throw new IllegalStateException("createEdgeReader: Got InputSplit which" +
+          " was not created by this class: " + inputSplit.getClass().getName());
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint) throws IOException, InterruptedException {
+    return
+        MultiInputUtils.getSplits(context, minSplitCountHint, edgeInputFormats);
+  }
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    MultiInputUtils.writeInputSplit(inputSplit, dataOutput, edgeInputFormats);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+      DataInput dataInput) throws IOException, ClassNotFoundException {
+    return MultiInputUtils.readInputSplit(dataInput, edgeInputFormats);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java
new file mode 100644
index 0000000..a84a7fd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Utility methods used by {@link MultiVertexInputFormat} and
+ * {@link MultiEdgeInputFormat}
+ */
+public class MultiInputUtils {
+  /** Do not instantiate */
+  private MultiInputUtils() {
+  }
+
+  /**
+   * Get the list of input splits for all formats.
+   *
+   * @param context The job context
+   * @param minSplitCountHint Minimum number of splits to create (hint)
+   * @param inputFormats List of input formats
+   * @return The list of input splits
+   */
+  public static List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint,
+      List<? extends GiraphInputFormat> inputFormats) throws IOException,
+      InterruptedException {
+    List<InputSplit> splits = Lists.newArrayList();
+    for (int index = 0; index < inputFormats.size(); index++) {
+      List<InputSplit> inputFormatSplits =
+          inputFormats.get(index).getSplits(context, minSplitCountHint);
+      for (InputSplit split : inputFormatSplits) {
+        splits.add(new InputSplitWithInputFormatIndex(split, index));
+      }
+    }
+    return splits;
+  }
+
+  /**
+   * Write input split info to DataOutput. Input split belongs to one of the
+   * formats.
+   *
+   * @param inputSplit InputSplit
+   * @param dataOutput DataOutput
+   * @param inputFormats List of input formats
+   */
+  public static void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput,
+      List<? extends GiraphInputFormat> inputFormats) throws IOException {
+    if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+      InputSplitWithInputFormatIndex split =
+          (InputSplitWithInputFormatIndex) inputSplit;
+      int index = split.getInputFormatIndex();
+      dataOutput.writeInt(index);
+      inputFormats.get(index).writeInputSplit(split.getSplit(), dataOutput);
+    } else {
+      throw new IllegalStateException("writeInputSplit: Got InputSplit which " +
+          "was not created by multi input: " + inputSplit.getClass().getName());
+    }
+  }
+
+  /**
+   * Read input split info from DataInput. Input split belongs to one of the
+   * formats.
+   *
+   * @param dataInput DataInput
+   * @param inputFormats List of input formats
+   * @return InputSplit
+   */
+  public static InputSplit readInputSplit(
+      DataInput dataInput,
+      List<? extends GiraphInputFormat> inputFormats) throws IOException,
+      ClassNotFoundException {
+    int index = dataInput.readInt();
+    InputSplit split = inputFormats.get(index).readInputSplit(dataInput);
+    return new InputSplitWithInputFormatIndex(split, index);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
new file mode 100644
index 0000000..631a451
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Vertex input format which wraps several vertex input formats.
+ * Provides the way to read data from multiple sources,
+ * using several different input formats.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class MultiVertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends VertexInputFormat<I, V, E> {
+  /** Vertex input formats */
+  private List<VertexInputFormat<I, V, E>> vertexInputFormats;
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    super.setConf(conf);
+    vertexInputFormats =
+        VertexInputFormatDescription.createVertexInputFormats(getConf());
+    if (vertexInputFormats.isEmpty()) {
+      throw new IllegalStateException("setConf: Using MultiVertexInputFormat " +
+          "without specifying vertex inputs");
+    }
+  }
+
+  @Override
+  public VertexReader<I, V, E> createVertexReader(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException {
+    if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+      InputSplitWithInputFormatIndex split =
+          (InputSplitWithInputFormatIndex) inputSplit;
+      VertexInputFormat<I, V, E> vertexInputFormat =
+          vertexInputFormats.get(split.getInputFormatIndex());
+      return vertexInputFormat.createVertexReader(split.getSplit(), context);
+    } else {
+      throw new IllegalStateException("createVertexReader: Got InputSplit " +
+          "which was not created by this class: " +
+          inputSplit.getClass().getName());
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint) throws IOException, InterruptedException {
+    return MultiInputUtils.getSplits(context, minSplitCountHint,
+        vertexInputFormats);
+  }
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    MultiInputUtils.writeInputSplit(inputSplit, dataOutput, vertexInputFormats);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+      DataInput dataInput) throws IOException, ClassNotFoundException {
+    return MultiInputUtils.readInputSplit(dataInput, vertexInputFormats);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
new file mode 100644
index 0000000..bdd5a74
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Description of the vertex input format - holds vertex input format class and
+ * all parameters specifically set for that vertex input format.
+ *
+ * Used only with {@link MultiVertexInputFormat}
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class VertexInputFormatDescription<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends InputFormatDescription<VertexInputFormat<I, V, E>> {
+  /**
+   * VertexInputFormats description - JSON array containing a JSON array for
+   * each vertex input. Vertex input JSON arrays contain one or two elements -
+   * first one is the name of vertex input class, and second one is JSON object
+   * with all specific parameters for this vertex input. For example:
+   * [["VIF1",{"p":"v1"}],["VIF2",{"p":"v2","q":"v"}]]
+   */
+  public static final StrConfOption VERTEX_INPUT_FORMAT_DESCRIPTIONS =
+      new StrConfOption("giraph.multiVertexInput.descriptions", null);
+
+  /**
+   * Constructor with vertex input format class
+   *
+   * @param vertexInputFormatClass Vertex input format class
+   */
+  public VertexInputFormatDescription(
+      Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass) {
+    super(vertexInputFormatClass);
+  }
+
+  /**
+   * Constructor with json string describing this input format
+   *
+   * @param description Json string describing this input format
+   */
+  public VertexInputFormatDescription(String description) {
+    super(description);
+  }
+
+  /**
+   * Create a copy of configuration which additionally has all parameters for
+   * this input format set
+   *
+   * @param conf Configuration which we want to create a copy from
+   * @return Copy of configuration
+   */
+  private ImmutableClassesGiraphConfiguration<I, V, E, Writable>
+  createConfigurationCopy(
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
+        new ImmutableClassesGiraphConfiguration<I, V, E, Writable>(conf);
+    confCopy.setVertexInputFormatClass(getInputFormatClass());
+    putParametersToConfiguration(confCopy);
+    return confCopy;
+  }
+
+  /**
+   * Get descriptions of vertex input formats from configuration.
+   *
+   * @param conf Configuration
+   * @param <I>  Vertex id
+   * @param <V>  Vertex data
+   * @param <E>  Edge data
+   * @return List of vertex input format descriptions
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable>
+  List<VertexInputFormatDescription<I, V, E>> getVertexInputFormatDescriptions(
+      Configuration conf) {
+    String vertexInputFormatDescriptions =
+        VERTEX_INPUT_FORMAT_DESCRIPTIONS.get(conf);
+    if (vertexInputFormatDescriptions == null) {
+      return Lists.newArrayList();
+    }
+    try {
+      JSONArray inputFormatsJson = new JSONArray(vertexInputFormatDescriptions);
+      List<VertexInputFormatDescription<I, V, E>> descriptions =
+          Lists.newArrayListWithCapacity(inputFormatsJson.length());
+      for (int i = 0; i < inputFormatsJson.length(); i++) {
+        descriptions.add(new VertexInputFormatDescription<I, V, E>(
+            inputFormatsJson.getString(i)));
+      }
+      return descriptions;
+    } catch (JSONException e) {
+      throw new IllegalStateException("getVertexInputFormatDescriptions: " +
+          "JSONException occurred while trying to process " +
+          vertexInputFormatDescriptions, e);
+    }
+  }
+
+  /**
+   * Create all vertex input formats
+   *
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @return List with all vertex input formats
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable>
+  List<VertexInputFormat<I, V, E>> createVertexInputFormats(
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    List<VertexInputFormatDescription<I, V, E>> descriptions =
+        getVertexInputFormatDescriptions(conf);
+    List<VertexInputFormat<I, V, E>> vertexInputFormats =
+        Lists.newArrayListWithCapacity(descriptions.size());
+    for (VertexInputFormatDescription<I, V, E> description : descriptions) {
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
+          description.createConfigurationCopy(conf);
+      vertexInputFormats.add(confCopy.createWrappedVertexInputFormat());
+    }
+    return vertexInputFormats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java
new file mode 100644
index 0000000..2611e9c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for Input formats which wrap several input formats and allow
+ * reading data from multiple sources.
+ */
+package org.apache.giraph.io.formats.multi;

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
index 9b14727..928b975 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
@@ -74,6 +76,7 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
       @Override
       public void setConf(
           ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+        WrappedEdgeInputFormat.this.getConf().updateConfiguration(conf);
         super.setConf(conf);
         edgeReader.setConf(conf);
       }
@@ -113,4 +116,16 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
       }
     };
   }
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    originalInputFormat.writeInputSplit(inputSplit, dataOutput);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+      DataInput dataInput) throws IOException, ClassNotFoundException {
+    return originalInputFormat.readInputSplit(dataInput);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
index 4f0cfea..ed606e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
@@ -76,6 +78,7 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
       @Override
       public void setConf(
           ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+        WrappedVertexInputFormat.this.getConf().updateConfiguration(conf);
         super.setConf(conf);
         vertexReader.setConf(conf);
       }
@@ -110,4 +113,16 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
       }
     };
   }
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    originalInputFormat.writeInputSplit(inputSplit, dataOutput);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+      DataInput dataInput) throws IOException, ClassNotFoundException {
+    return originalInputFormat.readInputSplit(dataInput);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 2c1a679..f00116a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -654,7 +654,8 @@ public class BspServiceMaster<I extends WritableComparable,
     for (int i = 0; i < splitList.size(); ++i) {
       InputSplit inputSplit = splitList.get(i);
       taskExecutor.submit(new LogStacktraceCallable<Void>(
-          new WriteInputSplit(inputSplit, inputSplitsPath, i, writeLocations)));
+          new WriteInputSplit(inputFormat, inputSplit, inputSplitsPath, i,
+              writeLocations)));
     }
     taskExecutor.shutdown();
     ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
@@ -1872,6 +1873,8 @@ public class BspServiceMaster<I extends WritableComparable,
    * Upon failure call() throws an exception.
    */
   private class WriteInputSplit implements Callable<Void> {
+    /** Input format */
+    private final GiraphInputFormat inputFormat;
     /** Input split which we are going to write */
     private final InputSplit inputSplit;
     /** Input splits path */
@@ -1884,6 +1887,7 @@ public class BspServiceMaster<I extends WritableComparable,
     /**
      * Constructor
      *
+     * @param inputFormat Input format
      * @param inputSplit Input split which we are going to write
      * @param inputSplitsPath Input splits path
      * @param index Index of the input split
@@ -1891,10 +1895,12 @@ public class BspServiceMaster<I extends WritableComparable,
      *                       be used by workers for prioritizing local splits
      *                       when reading)
      */
-    public WriteInputSplit(InputSplit inputSplit,
+    public WriteInputSplit(GiraphInputFormat inputFormat,
+                           InputSplit inputSplit,
                            String inputSplitsPath,
                            int index,
                            boolean writeLocations) {
+      this.inputFormat = inputFormat;
       this.inputSplit = inputSplit;
       this.inputSplitsPath = inputSplitsPath;
       this.index = index;
@@ -1926,9 +1932,7 @@ public class BspServiceMaster<I extends WritableComparable,
               locations == null ? "" : locations.toString());
         }
 
-        Text.writeString(outputStream,
-            inputSplit.getClass().getName());
-        ((Writable) inputSplit).write(outputStream);
+        inputFormat.writeInputSplit(inputSplit, outputStream);
         inputSplitPath = inputSplitsPath + "/" + index;
         getZkExt().createExt(inputSplitPath,
             byteArrayOutputStream.toByteArray(),

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 01937ab..51edbac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -313,6 +313,7 @@ public class BspServiceWorker<I extends WritableComparable,
 
     VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
         new VertexInputSplitsCallableFactory<I, V, E, M>(
+            getConfiguration().createWrappedVertexInputFormat(),
             getContext(),
             graphState,
             getConfiguration(),
@@ -350,6 +351,7 @@ public class BspServiceWorker<I extends WritableComparable,
 
     EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
         new EdgeInputSplitsCallableFactory<I, V, E, M>(
+            getConfiguration().createWrappedEdgeInputFormat(),
             getContext(),
             graphState,
             getConfiguration(),

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index de1e774..83fe5ea 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -57,6 +57,8 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(
       EdgeInputSplitsCallable.class);
+  /** Edge input format */
+  private final EdgeInputFormat<I, E> edgeInputFormat;
   /** Input split max edges (-1 denotes all) */
   private final long inputSplitMaxEdges;
 
@@ -67,6 +69,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param edgeInputFormat Edge input format
    * @param context Context
    * @param graphState Graph state
    * @param configuration Configuration
@@ -75,6 +78,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
    * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public EdgeInputSplitsCallable(
+      EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
@@ -83,6 +87,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       ZooKeeperExt zooKeeperExt)  {
     super(context, graphState, configuration, bspServiceWorker,
         splitsHandler, zooKeeperExt);
+    this.edgeInputFormat = edgeInputFormat;
 
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
 
@@ -90,6 +95,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     totalEdgesMeter = getTotalEdgesLoadedMeter();
   }
 
+  @Override
+  public EdgeInputFormat<I, E> getInputFormat() {
+    return edgeInputFormat;
+  }
+
   /**
    * Read edges from input split.  If testing, the user may request a
    * maximum number of edges to be read from an input split.
@@ -105,8 +115,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       InputSplit inputSplit,
       GraphState<I, V, E, M> graphState) throws IOException,
       InterruptedException {
-    EdgeInputFormat<I, E> edgeInputFormat =
-        configuration.createWrappedEdgeInputFormat();
     EdgeReader<I, E> edgeReader =
         edgeInputFormat.createEdgeReader(inputSplit, context);
     edgeReader.setConf(

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 4a1705b..33fb515 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
@@ -38,6 +39,8 @@ import org.apache.hadoop.mapreduce.Mapper;
 public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements CallableFactory<VertexEdgeCount> {
+  /** Edge input format */
+  private final EdgeInputFormat<I, E> edgeInputFormat;
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Graph state. */
@@ -54,6 +57,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param edgeInputFormat Edge input format
    * @param context Mapper context
    * @param graphState Graph state
    * @param configuration Configuration
@@ -62,12 +66,14 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
    * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public EdgeInputSplitsCallableFactory(
+      EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
       InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt) {
+    this.edgeInputFormat = edgeInputFormat;
     this.context = context;
     this.graphState = graphState;
     this.configuration = configuration;
@@ -79,6 +85,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
   @Override
   public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
     return new EdgeInputSplitsCallable<I, V, E, M>(
+        edgeInputFormat,
         context,
         graphState,
         configuration,

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index a3a9ab7..f7a8340 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -23,6 +23,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MeterDesc;
 import org.apache.giraph.time.SystemTime;
@@ -34,7 +35,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 
@@ -118,6 +118,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   // CHECKSTYLE: resume ParameterNumberCheck
 
   /**
+   * Get input format
+   *
+   * @return Input format
+   */
+  public abstract GiraphInputFormat getInputFormat();
+
+  /**
    * Get Meter tracking edges loaded
    *
    * @return Meter tracking edges loaded
@@ -255,12 +262,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
     if (useLocality) {
       Text.readString(inputStream); // location data unused here, skip
     }
-    String inputSplitClass = Text.readString(inputStream);
-    InputSplit inputSplit = (InputSplit)
-        ReflectionUtils.newInstance(
-            configuration.getClassByName(inputSplitClass),
-            configuration);
-    ((Writable) inputSplit).readFields(inputStream);
+    InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
 
     if (LOG.isInfoEnabled()) {
       LOG.info("getInputSplit: Reserved " + inputSplitPath +

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 224856e..d32ccaf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.partition.PartitionOwner;
@@ -57,6 +58,8 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(VertexInputSplitsCallable.class);
+  /** Vertex input format */
+  private final VertexInputFormat<I, V, E> vertexInputFormat;
   /** Input split max vertices (-1 denotes all) */
   private final long inputSplitMaxVertices;
   /** Bsp service worker (only use thread-safe methods) */
@@ -71,6 +74,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param vertexInputFormat Vertex input format
    * @param context Context
    * @param graphState Graph state
    * @param configuration Configuration
@@ -79,6 +83,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
    * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public VertexInputSplitsCallable(
+      VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
@@ -87,6 +92,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
       ZooKeeperExt zooKeeperExt)  {
     super(context, graphState, configuration, bspServiceWorker,
         splitsHandler, zooKeeperExt);
+    this.vertexInputFormat = vertexInputFormat;
 
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
     this.bspServiceWorker = bspServiceWorker;
@@ -96,6 +102,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     totalEdgesMeter = getTotalEdgesLoadedMeter();
   }
 
+  @Override
+  public GiraphInputFormat getInputFormat() {
+    return vertexInputFormat;
+  }
+
   /**
    * Read vertices from input split.  If testing, the user may request a
    * maximum number of vertices to be read from an input split.
@@ -111,8 +122,6 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
       InputSplit inputSplit,
       GraphState<I, V, E, M> graphState)
     throws IOException, InterruptedException {
-    VertexInputFormat<I, V, E> vertexInputFormat =
-        configuration.createWrappedVertexInputFormat();
     VertexReader<I, V, E> vertexReader =
         vertexInputFormat.createVertexReader(inputSplit, context);
     vertexReader.setConf(

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index 4eff3b8..cf5e8ad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
@@ -38,6 +39,8 @@ import org.apache.hadoop.mapreduce.Mapper;
 public class VertexInputSplitsCallableFactory<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements CallableFactory<VertexEdgeCount> {
+  /** Vertex input format */
+  private final VertexInputFormat<I, V, E> vertexInputFormat;
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Graph state. */
@@ -54,6 +57,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param vertexInputFormat Vertex input format
    * @param context Mapper context
    * @param graphState Graph state
    * @param configuration Configuration
@@ -62,12 +66,14 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
    * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public VertexInputSplitsCallableFactory(
+      VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
       InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt) {
+    this.vertexInputFormat = vertexInputFormat;
     this.context = context;
     this.graphState = graphState;
     this.configuration = configuration;
@@ -79,6 +85,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
   @Override
   public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
     return new VertexInputSplitsCallable<I, V, E, M>(
+        vertexInputFormat,
         context,
         graphState,
         configuration,