You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/04 00:28:26 UTC

[1/2] GIRAPH-639: Add support for multiple Vertex/Edge inputs (majakabiljo)

Updated Branches:
  refs/heads/trunk ff970f2db -> c5a87d161


http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 8d67c1d..98be881 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
@@ -33,14 +34,20 @@ import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
 import org.apache.giraph.hive.output.HiveVertexOutputFormat;
 import org.apache.giraph.hive.output.VertexToHive;
+import org.apache.giraph.io.formats.multi.EdgeInputFormatDescription;
+import org.apache.giraph.io.formats.multi.InputFormatDescription;
+import org.apache.giraph.io.formats.multi.MultiEdgeInputFormat;
+import org.apache.giraph.io.formats.multi.MultiVertexInputFormat;
+import org.apache.giraph.io.formats.multi.VertexInputFormatDescription;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import java.io.File;
@@ -81,11 +88,13 @@ public class HiveGiraphRunner implements Tool {
   /** vertex class. */
   private Class<? extends Vertex> vertexClass;
 
-  /** Vertex creator from hive records. */
-  private Class<? extends HiveToVertex> hiveToVertexClass;
+  /** Descriptions of vertex input formats */
+  private List<VertexInputFormatDescription> vertexInputDescriptions =
+      Lists.newArrayList();
 
-  /** Edge creator from hive records. */
-  private Class<? extends HiveToEdge> hiveToEdgeClass;
+  /** Descriptions of edge input formats */
+  private List<EdgeInputFormatDescription> edgeInputDescriptions =
+      Lists.newArrayList();
 
   /** Hive Vertex writer */
   private Class<? extends VertexToHive> vertexToHiveClass;
@@ -108,51 +117,100 @@ public class HiveGiraphRunner implements Tool {
     this.vertexClass = vertexClass;
   }
 
-  public Class<? extends HiveToVertex> getHiveToVertexClass() {
-    return hiveToVertexClass;
+  public List<VertexInputFormatDescription> getVertexInputDescriptions() {
+    return vertexInputDescriptions;
   }
 
   /**
-   * Set HiveToVertex used with HiveVertexInputFormat
+   * Whether to use vertex input.
    *
-   * @param hiveToVertexClass HiveToVertex
+   * @return true if vertex input enabled (at least one HiveToVertex is set).
    */
-  public void setHiveToVertexClass(
-      Class<? extends HiveToVertex> hiveToVertexClass) {
-    this.hiveToVertexClass = hiveToVertexClass;
-    HIVE_TO_VERTEX_CLASS.set(conf, hiveToVertexClass);
+  public boolean hasVertexInput() {
+    return !vertexInputDescriptions.isEmpty();
   }
 
   /**
-   * Whether to use vertex input.
+   * Add vertex input
    *
-   * @return true if vertex input enabled (HiveToVertex is set).
+   * @param hiveToVertexClass HiveToVertex class to use
+   * @param tableName Table name
+   * @param partitionFilter Partition filter, or null if no filter used
+   * @param additionalOptions Additional options, in the form "option=value"
    */
-  public boolean hasVertexValueInput() {
-    return hiveToVertexClass != null;
+  public void addVertexInput(Class<? extends HiveToVertex> hiveToVertexClass,
+      String tableName, String partitionFilter, String ... additionalOptions) {
+    VertexInputFormatDescription description =
+        new VertexInputFormatDescription(HiveVertexInputFormat.class);
+    description.addParameter(
+        HIVE_TO_VERTEX_CLASS.getKey(), hiveToVertexClass.getName());
+    description.addParameter(HIVE_VERTEX_INPUT_PROFILE_ID.getKey(),
+        "vertex_input_profile_" + vertexInputDescriptions.size());
+    description.addParameter(
+        HIVE_VERTEX_INPUT_TABLE.getKey(), tableName);
+    if (partitionFilter != null && !partitionFilter.isEmpty()) {
+      description.addParameter(
+          HIVE_VERTEX_INPUT_PARTITION.getKey(), partitionFilter);
+    }
+    addAdditionalOptions(description, additionalOptions);
+    vertexInputDescriptions.add(description);
   }
 
-  public Class<? extends HiveToEdge> getHiveToEdgeClass() {
-    return hiveToEdgeClass;
+  public List<EdgeInputFormatDescription> getEdgeInputDescriptions() {
+    return edgeInputDescriptions;
   }
 
   /**
    * Whether to use edge input.
    *
-   * @return true if edge input enabled (HiveToEdge is set).
+   * @return true if edge input enabled (at least one HiveToEdge is set).
    */
   public boolean hasEdgeInput() {
-    return hiveToEdgeClass != null;
+    return !edgeInputDescriptions.isEmpty();
+  }
+
+  /**
+   * Add edge input
+   *
+   * @param hiveToEdgeClass HiveToEdge class to use
+   * @param tableName Table name
+   * @param partitionFilter Partition filter, or null if no filter used
+   * @param additionalOptions Additional options, in the form "option=value"
+   */
+  public void addEdgeInput(Class<? extends HiveToEdge> hiveToEdgeClass,
+      String tableName, String partitionFilter, String ... additionalOptions) {
+    EdgeInputFormatDescription description =
+        new EdgeInputFormatDescription(HiveEdgeInputFormat.class);
+    description.addParameter(
+        HIVE_TO_EDGE_CLASS.getKey(), hiveToEdgeClass.getName());
+    description.addParameter(HIVE_EDGE_INPUT_PROFILE_ID.getKey(),
+        "edge_input_profile_" + edgeInputDescriptions.size());
+    description.addParameter(
+        HIVE_EDGE_INPUT_TABLE.getKey(), tableName);
+    if (partitionFilter != null && !partitionFilter.isEmpty()) {
+      description.addParameter(
+          HIVE_EDGE_INPUT_PARTITION.getKey(), partitionFilter);
+    }
+    addAdditionalOptions(description, additionalOptions);
+    edgeInputDescriptions.add(description);
   }
 
   /**
-   * Set HiveToEdge used with HiveEdgeInputFormat
+   * Add additional options to InputFormatDescription
    *
-   * @param hiveToEdgeClass HiveToEdge
+   * @param description InputFormatDescription
+   * @param additionalOptions Additional options
    */
-  public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) {
-    this.hiveToEdgeClass = hiveToEdgeClass;
-    HIVE_TO_EDGE_CLASS.set(conf, hiveToEdgeClass);
+  private static void addAdditionalOptions(InputFormatDescription description,
+      String ... additionalOptions) {
+    for (String additionalOption : additionalOptions) {
+      String[] nameValue = split(additionalOption, "=");
+      if (nameValue.length != 2) {
+        throw new IllegalStateException("Invalid additional option format " +
+            additionalOption + ", 'name=value' format expected");
+      }
+      description.addParameter(nameValue[0], nameValue[1]);
+    }
   }
 
   public Class<? extends VertexToHive> getVertexToHiveClass() {
@@ -169,14 +227,22 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-   * Set class used to write vertices to Hive.
+   * Set vertex output
    *
    * @param vertexToHiveClass class for writing vertices to Hive.
+   * @param tableName Table name
+   * @param partitionFilter Partition filter, or null if no filter used
    */
-  public void setVertexToHiveClass(
-      Class<? extends VertexToHive> vertexToHiveClass) {
+  public void setVertexOutput(
+      Class<? extends VertexToHive> vertexToHiveClass, String tableName,
+      String partitionFilter) {
     this.vertexToHiveClass = vertexToHiveClass;
     VERTEX_TO_HIVE_CLASS.set(conf, vertexToHiveClass);
+    HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile");
+    HIVE_VERTEX_OUTPUT_TABLE.set(conf, tableName);
+    if (partitionFilter != null) {
+      HIVE_VERTEX_OUTPUT_PARTITION.set(conf, partitionFilter);
+    }
   }
 
   /**
@@ -209,9 +275,6 @@ public class HiveGiraphRunner implements Tool {
     GiraphConfiguration giraphConf = job.getConfiguration();
     giraphConf.setVertexClass(vertexClass);
 
-    setupHiveInputs(giraphConf);
-    setupHiveOutput(giraphConf);
-
     giraphConf.setWorkerConfiguration(workers, workers, 100.0f);
     initGiraphJob(job);
 
@@ -221,42 +284,48 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-   * Initialize hive input settings
-   *
-   * @param conf Configuration to write to
-   * @throws TException thrift problem
+   * Prepare vertex input settings in Configuration
    */
-  private void setupHiveInputs(GiraphConfiguration conf) throws TException {
-    if (hiveToVertexClass != null) {
-      conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
-      HIVE_VERTEX_INPUT_PROFILE_ID.set(conf, "vertex_input_profile");
-    }
-
-    if (hiveToEdgeClass != null) {
-      conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
-      HIVE_EDGE_INPUT_PROFILE_ID.set(conf, "edge_input_profile");
+  @SuppressWarnings("unchecked")
+  public void prepareHiveVertexInputs() {
+    if (vertexInputDescriptions.size() == 1) {
+      GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+          vertexInputDescriptions.get(0).getInputFormatClass());
+      vertexInputDescriptions.get(0).putParametersToConfiguration(conf);
+    } else {
+      GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+          MultiVertexInputFormat.class);
+      VertexInputFormatDescription.VERTEX_INPUT_FORMAT_DESCRIPTIONS.set(conf,
+          InputFormatDescription.toJsonString(vertexInputDescriptions));
     }
   }
 
   /**
-   * Initialize hive output settings
-   *
-   * @param conf Configuration to write to
-   * @throws TException thrift problem
+   * Prepare edge input settings in Configuration
    */
-  private void setupHiveOutput(GiraphConfiguration conf) throws TException {
-    if (skipOutput) {
-      LOG.warn("run: Warning - Output will be skipped!");
-    } else if (vertexToHiveClass != null) {
-      conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class);
-      HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile");
+  @SuppressWarnings("unchecked")
+  public void prepareHiveEdgeInputs() {
+    if (edgeInputDescriptions.size() == 1) {
+      GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf,
+          edgeInputDescriptions.get(0).getInputFormatClass());
+      edgeInputDescriptions.get(0).putParametersToConfiguration(conf);
     } else {
-      LOG.fatal("output requested but " + VertexToHive.class.getSimpleName() +
-          " not set");
+      GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf,
+          MultiEdgeInputFormat.class);
+      EdgeInputFormatDescription.EDGE_INPUT_FORMAT_DESCRIPTIONS.set(conf,
+          InputFormatDescription.toJsonString(edgeInputDescriptions));
     }
   }
 
   /**
+   * Prepare output settings in Configuration
+   */
+  public void prepareHiveOutput() {
+    GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.set(conf,
+        HiveVertexOutputFormat.class);
+  }
+
+  /**
    * set hive configuration
    */
   private void adjustConfigurationForHive() {
@@ -312,47 +381,65 @@ public class HiveGiraphRunner implements Tool {
               " class name (-vertexClass) to use");
     }
 
-    String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass");
-    if (hiveToVertexClassStr != null) {
-      if (hiveToVertexClassStr.equals("disable")) {
-        hiveToVertexClass = null;
-      } else {
-        setHiveToVertexClass(
-            findClass(hiveToVertexClassStr, HiveToVertex.class));
+    String[] vertexInputs = cmdln.getOptionValues("vertexInput");
+    if (vertexInputs != null && vertexInputs.length != 0) {
+      vertexInputDescriptions.clear();
+      for (String vertexInput : vertexInputs) {
+        String[] parameters = split(vertexInput, ",");
+        if (parameters.length < 2) {
+          throw new IllegalStateException("Illegal vertex input description " +
+              vertexInput + " - HiveToVertex class and table name needed");
+        }
+        addVertexInput(findClass(parameters[0], HiveToVertex.class),
+            parameters[1], elementOrNull(parameters, 2),
+            copyOfArray(parameters, 3));
       }
     }
 
-    String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass");
-    if (hiveToEdgeClassStr != null) {
-      if (hiveToEdgeClassStr.equals("disable")) {
-        hiveToEdgeClass = null;
-      } else {
-        setHiveToEdgeClass(
-            findClass(hiveToEdgeClassStr, HiveToEdge.class));
+    String[] edgeInputs = cmdln.getOptionValues("edgeInput");
+    if (edgeInputs != null && edgeInputs.length != 0) {
+      edgeInputDescriptions.clear();
+      for (String edgeInput : edgeInputs) {
+        String[] parameters = split(edgeInput, ",");
+        if (parameters.length < 2) {
+          throw new IllegalStateException("Illegal edge input description " +
+              edgeInput + " - HiveToEdge class and table name needed");
+        }
+        addEdgeInput(findClass(parameters[0], HiveToEdge.class),
+            parameters[1], elementOrNull(parameters, 2),
+            copyOfArray(parameters, 3));
       }
     }
 
-    String vertexToHiveClassStr = cmdln.getOptionValue("vertexToHiveClass");
-    if (vertexToHiveClassStr != null) {
-      setVertexToHiveClass(findClass(vertexToHiveClassStr, VertexToHive.class));
+    String output = cmdln.getOptionValue("output");
+    if (output != null) {
+      // Partition filter can contain commas so we limit the number of times
+      // we split
+      String[] parameters = split(output, ",", 3);
+      if (parameters.length < 2) {
+        throw new IllegalStateException("Illegal output description " +
+            output + " - VertexToHive class and table name needed");
+      }
+      setVertexOutput(findClass(parameters[0], VertexToHive.class),
+          parameters[1], elementOrNull(parameters, 2));
     }
 
     if (cmdln.hasOption("skipOutput")) {
       skipOutput = true;
     }
 
-    if (hiveToVertexClass == null && hiveToEdgeClass == null) {
+    if (!hasVertexInput() && !hasEdgeInput()) {
       throw new IllegalArgumentException(
           "Need at least one of Giraph " +
           HiveToVertex.class.getSimpleName() +
-          " class name (-hiveToVertexClass) and " +
+          " (-vertexInput) and " +
           HiveToEdge.class.getSimpleName() +
-          " class name (-hiveToEdgeClass)");
+          " (-edgeInput)");
     }
     if (vertexToHiveClass == null && !skipOutput) {
       throw new IllegalArgumentException(
           "Need the Giraph " + VertexToHive.class.getSimpleName() +
-          " class name (-vertexToHiveClass) to use");
+          " (-output) to use");
     }
     String workersStr = cmdln.getOptionValue("workers");
     if (workersStr == null) {
@@ -360,40 +447,24 @@ public class HiveGiraphRunner implements Tool {
           "Need to choose the number of workers (-w)");
     }
 
-    String vertexInputTableStr = cmdln.getOptionValue("vertexInputTable");
-    if (vertexInputTableStr == null && hiveToVertexClass != null) {
-      throw new IllegalArgumentException(
-          "Need to set the vertex input table name (-vi)");
-    }
+    String dbName = cmdln.getOptionValue("dbName", "default");
 
-    String edgeInputTableStr = cmdln.getOptionValue("edgeInputTable");
-    if (edgeInputTableStr == null && hiveToEdgeClass != null) {
-      throw new IllegalArgumentException(
-          "Need to set the edge input table name (-ei)");
+    if (hasVertexInput()) {
+      HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName);
+      prepareHiveVertexInputs();
     }
 
-    String outputTableStr = cmdln.getOptionValue("outputTable");
-    if (outputTableStr == null) {
-      throw new IllegalArgumentException(
-          "Need to set the output table name (-o)");
+    if (hasEdgeInput()) {
+      HIVE_EDGE_INPUT_DATABASE.set(conf, dbName);
+      prepareHiveEdgeInputs();
     }
 
-    String dbName = cmdln.getOptionValue("dbName", "default");
-
-    HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName);
-    HIVE_VERTEX_INPUT_TABLE.set(conf, vertexInputTableStr);
-    HIVE_VERTEX_INPUT_PARTITION.set(conf,
-        cmdln.getOptionValue("vertexInputFilter"));
-
-    HIVE_EDGE_INPUT_DATABASE.set(conf, dbName);
-    HIVE_EDGE_INPUT_TABLE.set(conf, edgeInputTableStr);
-    HIVE_EDGE_INPUT_PARTITION.set(conf,
-        cmdln.getOptionValue("edgeInputFilter"));
-
-    HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName);
-    HIVE_VERTEX_OUTPUT_TABLE.set(conf, cmdln.getOptionValue("outputTable"));
-    HIVE_VERTEX_OUTPUT_PARTITION.set(conf,
-        cmdln.getOptionValue("outputPartition"));
+    if (!skipOutput) {
+      HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName);
+      prepareHiveOutput();
+    } else {
+      LOG.warn("run: Warning - Output will be skipped!");
+    }
 
     workers = Integer.parseInt(workersStr);
 
@@ -447,42 +518,44 @@ public class HiveGiraphRunner implements Tool {
     options.addOption("db", "dbName", true, "Hive database name");
 
     // Vertex input settings
-    options.addOption(null, "hiveToVertexClass", true,
-        "Giraph " + HiveToVertex.class.getSimpleName() +
-            " class to use (default - " +
-            (hiveToVertexClass == null ? "not used" :
-                hiveToVertexClass.getSimpleName()) + "), " +
-            "\"disable\" will unset this option");
-    options.addOption("vi", "vertexInputTable", true,
-        "Vertex input table name");
-    options.addOption("VI", "vertexInputFilter", true,
-        "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
+    options.addOption("vi", "vertexInput", true, getInputOptionDescription(
+        "vertex", HiveToVertex.class.getSimpleName()));
 
     // Edge input settings
-    options.addOption(null, "hiveToEdgeClass", true,
-        "Giraph " + HiveToEdge.class.getSimpleName() +
-            " class to use (default - " +
-            (hiveToEdgeClass == null ? "not used" :
-                hiveToEdgeClass.getSimpleName()) + "), " +
-            "\"disable\" will unset this option");
-    options.addOption("ei", "edgeInputTable", true,
-        "Edge input table name");
-    options.addOption("EI", "edgeInputFilter", true,
-        "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
+    options.addOption("ei", "edgeInput", true, getInputOptionDescription(
+        "edge", HiveToEdge.class.getSimpleName()));
 
     // Vertex output settings
-    if (vertexToHiveClass == null) {
-      options.addOption(null, "vertexToHiveClass", true,
-          "Giraph " + VertexToHive.class.getSimpleName() + " class to use");
-    }
-
-    options.addOption("o", "outputTable", true, "Output table name");
-    options.addOption("O", "outputPartition", true,
-        "Output table partition values (e.g., \"a=1,b=two\")");
+    options.addOption("o", "output", true,
+        "Giraph " + VertexToHive.class.getSimpleName() + " class to use," +
+            " table name and partition filter (optional). Example:\n" +
+            "\"MyVertexToHive, myTableName, a=1,b=two\"");
     options.addOption("s", "skipOutput", false, "Skip output?");
   }
 
   /**
+   * Get description for the input format option (vertex or edge).
+   *
+   * @param inputType Type of input (vertex or edge)
+   * @param hiveToObjectClassName HiveToVertex or HiveToEdge
+   * @return Description for the input format option
+   */
+  private static String getInputOptionDescription(String inputType,
+      String hiveToObjectClassName) {
+    StringBuilder inputOption = new StringBuilder();
+    inputOption.append("Giraph ").append(hiveToObjectClassName).append(
+        " class to use, table name and partition filter (optional).");
+    inputOption.append(" Additional options for the input format can be " +
+        "specified as well.");
+    inputOption.append(" You can set as many ").append(inputType).append(
+        " inputs as you like.");
+    inputOption.append(" Example:\n");
+    inputOption.append("\"My").append(hiveToObjectClassName).append(
+        ", myTableName, a<2 AND b='two', option1=value1, option2=value2\"");
+    return inputOption.toString();
+  }
+
+  /**
    * add string to collection
    * @param conf Configuration
    * @param name name to add
@@ -533,7 +606,7 @@ public class HiveGiraphRunner implements Tool {
 
   @Override
   public final void setConf(Configuration conf) {
-    this.conf = conf;
+    this.conf = new GiraphConfiguration(conf);
   }
 
   /**
@@ -569,35 +642,83 @@ public class HiveGiraphRunner implements Tool {
    * @param giraphConf GiraphConfiguration
    */
   private void logOptions(GiraphConfiguration giraphConf) {
-    GiraphClasses classes = new GiraphClasses(giraphConf);
+    GiraphClasses<?, ?, ?, ?> classes = new GiraphClasses(giraphConf);
 
     LOG.info(getClass().getSimpleName() + " with");
 
     LOG.info(LOG_PREFIX + "-vertexClass=" + vertexClass.getCanonicalName());
 
-    if (hiveToVertexClass != null) {
-      LOG.info(LOG_PREFIX + "-hiveToVertexClass=" +
-          hiveToVertexClass.getCanonicalName());
-    }
-    if (classes.getVertexInputFormatClass() != null) {
-      LOG.info(LOG_PREFIX + "-vertexInputFormatClass=" +
-          classes.getVertexInputFormatClass().getCanonicalName());
+    for (VertexInputFormatDescription description : vertexInputDescriptions) {
+      LOG.info(LOG_PREFIX + "Vertex input: " + description);
     }
 
-    if (hiveToEdgeClass != null) {
-      LOG.info(LOG_PREFIX + "-hiveToEdgeClass=" +
-          hiveToEdgeClass.getCanonicalName());
-    }
-    if (classes.getEdgeInputFormatClass() != null) {
-      LOG.info(LOG_PREFIX + "-edgeInputFormatClass=" +
-          classes.getEdgeInputFormatClass().getCanonicalName());
+    for (EdgeInputFormatDescription description : edgeInputDescriptions) {
+      LOG.info(LOG_PREFIX + "Edge input: " + description);
     }
 
     if (classes.getVertexOutputFormatClass() != null) {
-      LOG.info(LOG_PREFIX + "-outputFormatClass=" +
-          classes.getVertexOutputFormatClass().getCanonicalName());
+      LOG.info(LOG_PREFIX + "Output: VertexToHive=" +
+          vertexToHiveClass.getCanonicalName() + ", table=" +
+          HIVE_VERTEX_OUTPUT_TABLE.get(conf) + ", partition=\"" +
+          HIVE_VERTEX_OUTPUT_PARTITION.get(conf) + "\"");
     }
 
     LOG.info(LOG_PREFIX + "-workers=" + workers);
   }
+
+  /**
+   * Split a string using separator and trim the results
+   *
+   * @param stringToSplit String to split
+   * @param separator Separator
+   * @return Separated strings, trimmed
+   */
+  private static String[] split(String stringToSplit, String separator) {
+    return split(stringToSplit, separator, -1);
+  }
+
+  /**
+   * Split a string using separator and trim the results
+   *
+   * @param stringToSplit String to split
+   * @param separator Separator
+   * @param limit See {@link String#split(String, int)}
+   * @return Separated strings, trimmed
+   */
+  private static String[] split(String stringToSplit, String separator,
+      int limit) {
+    Splitter splitter = Splitter.on(separator).trimResults();
+    if (limit > 0) {
+      splitter = splitter.limit(limit);
+    }
+    return Iterables.toArray(splitter.split(stringToSplit), String.class);
+  }
+
+  /**
+   * Get the element in array at certain position, or null if the position is
+   * out of array size
+   *
+   * @param array Array
+   * @param position Position
+   * @return Element at the position or null if the position is out of array
+   */
+  private static String elementOrNull(String[] array, int position) {
+    return (position < array.length) ? array[position] : null;
+  }
+
+  /**
+   * Return a copy of array from some position to the end,
+   * or empty array if startIndex is out of array size
+   *
+   * @param array Array to take a copy from
+   * @param startIndex Starting position
+   * @return Copy of part of the array
+   */
+  private static String[] copyOfArray(String[] array, int startIndex) {
+    if (array.length <= startIndex) {
+      return new String[0];
+    } else {
+      return Arrays.copyOfRange(array, startIndex, array.length);
+    }
+  }
 }


[2/2] git commit: updated refs/heads/trunk to c5a87d1

Posted by ma...@apache.org.
GIRAPH-639: Add support for multiple Vertex/Edge inputs (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c5a87d16
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c5a87d16
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c5a87d16

Branch: refs/heads/trunk
Commit: c5a87d16119c7e9a9f2af80a5534c4fe37605dd3
Parents: ff970f2
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Fri May 3 15:14:35 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Fri May 3 15:14:35 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../apache/giraph/conf/GiraphConfiguration.java    |   10 +-
 .../conf/ImmutableClassesGiraphConfiguration.java  |   14 +
 .../java/org/apache/giraph/io/EdgeInputFormat.java |    6 +-
 .../org/apache/giraph/io/GiraphInputFormat.java    |   47 ++-
 .../org/apache/giraph/io/VertexInputFormat.java    |    8 +-
 .../formats/multi/EdgeInputFormatDescription.java  |  145 +++++
 .../io/formats/multi/InputFormatDescription.java   |  167 ++++++
 .../multi/InputSplitWithInputFormatIndex.java      |   67 +++
 .../io/formats/multi/MultiEdgeInputFormat.java     |   93 ++++
 .../giraph/io/formats/multi/MultiInputUtils.java   |  103 ++++
 .../io/formats/multi/MultiVertexInputFormat.java   |   95 ++++
 .../multi/VertexInputFormatDescription.java        |  151 +++++
 .../giraph/io/formats/multi/package-info.java      |   22 +
 .../giraph/io/internal/WrappedEdgeInputFormat.java |   15 +
 .../io/internal/WrappedVertexInputFormat.java      |   15 +
 .../org/apache/giraph/master/BspServiceMaster.java |   14 +-
 .../org/apache/giraph/worker/BspServiceWorker.java |    2 +
 .../giraph/worker/EdgeInputSplitsCallable.java     |   12 +-
 .../worker/EdgeInputSplitsCallableFactory.java     |    7 +
 .../apache/giraph/worker/InputSplitsCallable.java  |   16 +-
 .../giraph/worker/VertexInputSplitsCallable.java   |   13 +-
 .../worker/VertexInputSplitsCallableFactory.java   |    7 +
 .../org/apache/giraph/hive/HiveGiraphRunner.java   |  427 +++++++++-----
 24 files changed, 1271 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7c906d2..4d2c90c 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.0.1 - unreleased
+  GIRAPH-639: Add support for multiple Vertex/Edge inputs (majakabiljo)
+
   GIRAPH-653: Hadoop_non_secure broken (majakabiljo)
 
   GIRAPH-650: Exception in GiraphConfiguration initialization (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index deb2069..45a29ff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -185,7 +185,7 @@ public class GiraphConfiguration extends Configuration
    *
    * @param vertexInputFormatClass Determines how graph is input
    */
-  public final void setVertexInputFormatClass(
+  public void setVertexInputFormatClass(
       Class<? extends VertexInputFormat> vertexInputFormatClass) {
     VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
   }
@@ -204,7 +204,7 @@ public class GiraphConfiguration extends Configuration
    *
    * @param edgeInputFormatClass Determines how graph is input
    */
-  public final void setEdgeInputFormatClass(
+  public void setEdgeInputFormatClass(
       Class<? extends EdgeInputFormat> edgeInputFormatClass) {
     EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
   }
@@ -955,8 +955,10 @@ public class GiraphConfiguration extends Configuration
    * @param conf Configuration
    */
   public void updateConfiguration(Configuration conf) {
-    for (Map.Entry<String, String> parameter : giraphSetParameters) {
-      conf.set(parameter.getKey(), parameter.getValue());
+    if (this != conf) {
+      for (Map.Entry<String, String> parameter : giraphSetParameters) {
+        conf.set(parameter.getKey(), parameter.getValue());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index f5a926f..f992b37 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -203,6 +203,13 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   @Override
+  public void setVertexInputFormatClass(
+      Class<? extends VertexInputFormat> vertexInputFormatClass) {
+    super.setVertexInputFormatClass(vertexInputFormatClass);
+    classes.setVertexInputFormatClass(vertexInputFormatClass);
+  }
+
+  @Override
   public boolean hasVertexOutputFormat() {
     return classes.hasVertexOutputFormat();
   }
@@ -305,6 +312,13 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     return wrappedEdgeInputFormat;
   }
 
+  @Override
+  public void setEdgeInputFormatClass(
+      Class<? extends EdgeInputFormat> edgeInputFormatClass) {
+    super.setEdgeInputFormatClass(edgeInputFormatClass);
+    classes.setEdgeInputFormatClass(edgeInputFormatClass);
+  }
+
   /**
    * Get the user's subclassed {@link AggregatorWriter}.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
index 2aac1f0..1f4451f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,10 +40,7 @@ import java.io.IOException;
  * @param <E> Edge data
  */
 public abstract class EdgeInputFormat<I extends WritableComparable,
-    E extends Writable>
-    extends
-    DefaultImmutableClassesGiraphConfigurable<I, Writable, E, Writable>
-    implements GiraphInputFormat {
+    E extends Writable> extends GiraphInputFormat<I, Writable, E> {
   /**
    * Create an edge reader for a given split. The framework will call
    * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
index 13efc93..58d79a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
@@ -18,15 +18,29 @@
 
 package org.apache.giraph.io;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Common interface for {@link VertexInputFormat} and {@link EdgeInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
  */
-public interface GiraphInputFormat {
+public abstract class GiraphInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends
+    DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable> {
   /**
    * Get the list of input splits for the format.
    *
@@ -36,6 +50,33 @@ public interface GiraphInputFormat {
    * @throws IOException
    * @throws InterruptedException
    */
-  List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
-    throws IOException, InterruptedException;
+  public abstract List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint) throws IOException, InterruptedException;
+
+  /**
+   * Write input split info to DataOutput.
+   *
+   * @param inputSplit InputSplit
+   * @param dataOutput DataOutput
+   */
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    Text.writeString(dataOutput, inputSplit.getClass().getName());
+    ((Writable) inputSplit).write(dataOutput);
+  }
+
+  /**
+   * Read input split info from DataInput.
+   *
+   * @param dataInput DataInput
+   * @return InputSplit
+   */
+  public InputSplit readInputSplit(DataInput dataInput) throws IOException,
+      ClassNotFoundException {
+    String inputSplitClass = Text.readString(dataInput);
+    InputSplit inputSplit = (InputSplit) ReflectionUtils.newInstance(
+            getConf().getClassByName(inputSplitClass), getConf());
+    ((Writable) inputSplit).readFields(dataInput);
+    return inputSplit;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
index c4d7fe2..6c8adc2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
@@ -20,7 +20,6 @@ package org.apache.giraph.io;
 
 import java.io.IOException;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -45,20 +44,17 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  */
 @SuppressWarnings("rawtypes")
 public abstract class VertexInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable>
-    implements GiraphInputFormat {
+    V extends Writable, E extends Writable> extends GiraphInputFormat<I, V, E> {
   /**
    * Create a vertex reader for a given split. Guaranteed to have been
    * configured with setConf() prior to use.  The framework will also call
-   * {@linkVertexReader#initialize(InputSplit, TaskAttemptContext)} before
+   * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
    * the split is used.
    *
    * @param split the split to be read
    * @param context the information about the task
    * @return a new record reader
    * @throws IOException
-   * @throws InterruptedException
    */
   public abstract VertexReader<I, V, E> createVertexReader(
       InputSplit split,

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
new file mode 100644
index 0000000..569cee9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Description of the edge input format - holds edge input format class and all
+ * parameters specifically set for that edge input format.
+ *
+ * Used only with {@link MultiEdgeInputFormat}
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class EdgeInputFormatDescription<I extends WritableComparable,
+    E extends Writable> extends InputFormatDescription<EdgeInputFormat<I, E>> {
+  /**
+   * EdgeInputFormats description - JSON array containing a JSON array for
+   * each edge input. Edge input JSON arrays contain one or two elements -
+   * first one is the name of edge input class, and second one is JSON object
+   * with all specific parameters for this edge input. For example:
+   * [["EIF1",{"p":"v1"}],["EIF2",{"p":"v2","q":"v"}]]
+   */
+  public static final StrConfOption EDGE_INPUT_FORMAT_DESCRIPTIONS =
+      new StrConfOption("giraph.multiEdgeInput.descriptions", null);
+
+  /**
+   * Constructor with edge input format class
+   *
+   * @param edgeInputFormatClass Edge input format class
+   */
+  public EdgeInputFormatDescription(
+      Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass) {
+    super(edgeInputFormatClass);
+  }
+
+  /**
+   * Constructor with json string describing this input format
+   *
+   * @param description Json string describing this input format
+   */
+  public EdgeInputFormatDescription(String description) {
+    super(description);
+  }
+
+  /**
+   * Create a copy of configuration which additionally has all parameters for
+   * this input format set
+   *
+   * @param conf Configuration which we want to create a copy from
+   * @return Copy of configuration
+   */
+  private ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>
+  createConfigurationCopy(
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+    ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
+        new ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>(conf);
+    confCopy.setEdgeInputFormatClass(getInputFormatClass());
+    putParametersToConfiguration(confCopy);
+    return confCopy;
+  }
+
+  /**
+   * Get descriptions of edge input formats from configuration.
+   *
+   * @param conf Configuration
+   * @param <I>  Vertex id
+   * @param <E>  Edge data
+   * @return List of edge input format descriptions
+   */
+  public static <I extends WritableComparable, E extends Writable>
+  List<EdgeInputFormatDescription<I, E>> getEdgeInputFormatDescriptions(
+      Configuration conf) {
+    String edgeInputFormatDescriptions =
+        EDGE_INPUT_FORMAT_DESCRIPTIONS.get(conf);
+    if (edgeInputFormatDescriptions == null) {
+      return Lists.newArrayList();
+    }
+    try {
+      JSONArray inputFormatsJson = new JSONArray(edgeInputFormatDescriptions);
+      List<EdgeInputFormatDescription<I, E>> descriptions =
+          Lists.newArrayListWithCapacity(inputFormatsJson.length());
+      for (int i = 0; i < inputFormatsJson.length(); i++) {
+        descriptions.add(new EdgeInputFormatDescription<I, E>(
+            inputFormatsJson.getString(i)));
+      }
+      return descriptions;
+    } catch (JSONException e) {
+      throw new IllegalStateException("getEdgeInputFormatDescriptions: " +
+          "JSONException occurred while trying to process " +
+          edgeInputFormatDescriptions, e);
+    }
+  }
+
+  /**
+   * Create all edge input formats
+   *
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <E> Edge data
+   * @return List with all edge input formats
+   */
+  public static <I extends WritableComparable,
+      E extends Writable> List<EdgeInputFormat<I, E>> createEdgeInputFormats(
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+    List<EdgeInputFormatDescription<I, E>> descriptions =
+        getEdgeInputFormatDescriptions(conf);
+    List<EdgeInputFormat<I, E>> edgeInputFormats =
+        Lists.newArrayListWithCapacity(descriptions.size());
+    for (EdgeInputFormatDescription<I, E> description : descriptions) {
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
+          description.createConfigurationCopy(conf);
+      edgeInputFormats.add(confCopy.createWrappedEdgeInputFormat());
+    }
+    return edgeInputFormats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java
new file mode 100644
index 0000000..de34a1f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Description of the input format - holds input format class and all
+ * parameters specifically set for that input format.
+ *
+ * Used only with input formats which wrap several input formats into one
+ * ({@link MultiVertexInputFormat} and {@link MultiEdgeInputFormat})
+ *
+ * @param <IF> Input format type
+ */
+public abstract class InputFormatDescription<IF extends GiraphInputFormat> {
+  /** Input format class */
+  private Class<? extends IF> inputFormatClass;
+  /** Parameters set specifically for this input format */
+  private final Map<String, String> parameters = Maps.newHashMap();
+
+  /**
+   * Constructor with input format class
+   *
+   * @param inputFormatClass Input format class
+   */
+  public InputFormatDescription(Class<? extends IF> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass;
+  }
+
+  /**
+   * Constructor with json string describing this input format
+   *
+   * @param description Json string describing this input format
+   */
+  @SuppressWarnings("unchecked")
+  public InputFormatDescription(String description) {
+    try {
+      JSONArray jsonArray = new JSONArray(description);
+      inputFormatClass =
+          (Class<? extends IF>) Class.forName(jsonArray.getString(0));
+      if (jsonArray.length() > 1) {
+        addParameters(jsonArray.getJSONObject(1));
+      }
+    } catch (JSONException e) {
+      throw new IllegalStateException(
+          "Failed to parse JSON " + description, e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("Couldn't find " +
+          "input format class from description " + description, e);
+    }
+  }
+
+  /**
+   * Add parameter to this input format description
+   *
+   * @param name  Parameter name
+   * @param value Parameter value
+   */
+  public void addParameter(String name, String value) {
+    parameters.put(name, value);
+  }
+
+  /**
+   * Add all parameters from json object
+   *
+   * @param parametersJson Json object to read parameters from
+   */
+  public void addParameters(JSONObject parametersJson) {
+    Iterator<?> keys = parametersJson.keys();
+    while (keys.hasNext()) {
+      String key = (String) keys.next();
+      try {
+        addParameter(key, parametersJson.getString(key));
+      } catch (JSONException e) {
+        throw new IllegalStateException("addParameters: Failed to parse " +
+            parametersJson, e);
+      }
+    }
+  }
+
+  /**
+   * Convert input format description to json array
+   *
+   * @return Json array representing this input format description
+   */
+  public JSONArray toJsonArray() {
+    JSONArray jsonArray = new JSONArray();
+    jsonArray.put(inputFormatClass.getName());
+    JSONObject jsonParameters = new JSONObject();
+    for (Map.Entry<String, String> entry : parameters.entrySet()) {
+      try {
+        jsonParameters.put(entry.getKey(), entry.getValue());
+      } catch (JSONException e) {
+        throw new IllegalStateException("toJsonArray: JSONException occurred " +
+            "while trying to process (" + entry.getKey() + ", " +
+            entry.getValue() + ")", e);
+      }
+    }
+    jsonArray.put(jsonParameters);
+    return jsonArray;
+  }
+
+  public Class<? extends IF> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  public void setInputFormatClass(Class<? extends IF> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass;
+  }
+
+  /**
+   * Put parameters from this input format description to configuration
+   *
+   * @param conf Configuration to put parameters to
+   */
+  public void putParametersToConfiguration(Configuration conf) {
+    for (Map.Entry<String, String> entry : parameters.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return toJsonArray().toString();
+  }
+
+  /**
+   * Create JSON string for the InputFormatDescriptions
+   *
+   * @param descriptions InputFormatDescriptions
+   * @return JSON string describing these InputFormatDescriptions
+   */
+  public static String toJsonString(
+      Iterable<? extends InputFormatDescription> descriptions) {
+    JSONArray jsonArray = new JSONArray();
+    for (InputFormatDescription description : descriptions) {
+      jsonArray.put(description.toJsonArray());
+    }
+    return jsonArray.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java
new file mode 100644
index 0000000..c6bba29
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+
+/**
+ * Helper InputSplit class which holds the information about input format
+ * which owns this input split.
+ *
+ * Used only with input formats which wrap several input formats into one
+ * ({@link MultiVertexInputFormat} and {@link MultiEdgeInputFormat})
+ */
+class InputSplitWithInputFormatIndex extends InputSplit {
+  /** Wrapped input split */
+  private InputSplit split;
+  /** Index of input format which owns the input split */
+  private int inputFormatIndex;
+
+  /**
+   * Constructor
+   *
+   * @param split            Input split
+   * @param inputFormatIndex Index of input format which owns the input split
+   */
+  InputSplitWithInputFormatIndex(InputSplit split, int inputFormatIndex) {
+    this.inputFormatIndex = inputFormatIndex;
+    this.split = split;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return split.getLength();
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return split.getLocations();
+  }
+
+  int getInputFormatIndex() {
+    return inputFormatIndex;
+  }
+
+  InputSplit getSplit() {
+    return split;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
new file mode 100644
index 0000000..113b3bc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Edge input format which wraps several edge input formats.
+ * Provides the way to read data from multiple sources,
+ * using several different input formats.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class MultiEdgeInputFormat<I extends WritableComparable,
+    E extends Writable> extends EdgeInputFormat<I, E> {
+  /** Edge input formats */
+  private List<EdgeInputFormat<I, E>> edgeInputFormats;
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+    super.setConf(conf);
+    edgeInputFormats =
+        EdgeInputFormatDescription.createEdgeInputFormats(getConf());
+    if (edgeInputFormats.isEmpty()) {
+      throw new IllegalStateException("setConf: Using MultiEdgeInputFormat " +
+          "without specifying edge inputs");
+    }
+  }
+
+  @Override
+  public EdgeReader<I, E> createEdgeReader(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException {
+    if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+      InputSplitWithInputFormatIndex split =
+          (InputSplitWithInputFormatIndex) inputSplit;
+      EdgeInputFormat<I, E> edgeInputFormat =
+          edgeInputFormats.get(split.getInputFormatIndex());
+      return edgeInputFormat.createEdgeReader(split.getSplit(), context);
+    } else {
+      throw new IllegalStateException("createEdgeReader: Got InputSplit which" +
+          " was not created by this class: " + inputSplit.getClass().getName());
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint) throws IOException, InterruptedException {
+    return
+        MultiInputUtils.getSplits(context, minSplitCountHint, edgeInputFormats);
+  }
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    MultiInputUtils.writeInputSplit(inputSplit, dataOutput, edgeInputFormats);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+      DataInput dataInput) throws IOException, ClassNotFoundException {
+    return MultiInputUtils.readInputSplit(dataInput, edgeInputFormats);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java
new file mode 100644
index 0000000..a84a7fd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Utility methods used by {@link MultiVertexInputFormat} and
+ * {@link MultiEdgeInputFormat}
+ */
+public class MultiInputUtils {
+  /** Do not instantiate */
+  private MultiInputUtils() {
+  }
+
+  /**
+   * Get the list of input splits for all formats.
+   *
+   * @param context The job context
+   * @param minSplitCountHint Minimum number of splits to create (hint)
+   * @param inputFormats List of input formats
+   * @return The list of input splits
+   */
+  public static List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint,
+      List<? extends GiraphInputFormat> inputFormats) throws IOException,
+      InterruptedException {
+    List<InputSplit> splits = Lists.newArrayList();
+    for (int index = 0; index < inputFormats.size(); index++) {
+      List<InputSplit> inputFormatSplits =
+          inputFormats.get(index).getSplits(context, minSplitCountHint);
+      for (InputSplit split : inputFormatSplits) {
+        splits.add(new InputSplitWithInputFormatIndex(split, index));
+      }
+    }
+    return splits;
+  }
+
+  /**
+   * Write input split info to DataOutput. Input split belongs to one of the
+   * formats.
+   *
+   * @param inputSplit InputSplit
+   * @param dataOutput DataOutput
+   * @param inputFormats List of input formats
+   */
+  public static void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput,
+      List<? extends GiraphInputFormat> inputFormats) throws IOException {
+    if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+      InputSplitWithInputFormatIndex split =
+          (InputSplitWithInputFormatIndex) inputSplit;
+      int index = split.getInputFormatIndex();
+      dataOutput.writeInt(index);
+      inputFormats.get(index).writeInputSplit(split.getSplit(), dataOutput);
+    } else {
+      throw new IllegalStateException("writeInputSplit: Got InputSplit which " +
+          "was not created by multi input: " + inputSplit.getClass().getName());
+    }
+  }
+
+  /**
+   * Read input split info from DataInput. Input split belongs to one of the
+   * formats.
+   *
+   * @param dataInput DataInput
+   * @param inputFormats List of input formats
+   * @return InputSplit
+   */
+  public static InputSplit readInputSplit(
+      DataInput dataInput,
+      List<? extends GiraphInputFormat> inputFormats) throws IOException,
+      ClassNotFoundException {
+    int index = dataInput.readInt();
+    InputSplit split = inputFormats.get(index).readInputSplit(dataInput);
+    return new InputSplitWithInputFormatIndex(split, index);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
new file mode 100644
index 0000000..631a451
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Vertex input format which wraps several vertex input formats.
+ * Provides the way to read data from multiple sources,
+ * using several different input formats.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class MultiVertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends VertexInputFormat<I, V, E> {
+  /** Vertex input formats */
+  private List<VertexInputFormat<I, V, E>> vertexInputFormats;
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    super.setConf(conf);
+    vertexInputFormats =
+        VertexInputFormatDescription.createVertexInputFormats(getConf());
+    if (vertexInputFormats.isEmpty()) {
+      throw new IllegalStateException("setConf: Using MultiVertexInputFormat " +
+          "without specifying vertex inputs");
+    }
+  }
+
+  @Override
+  public VertexReader<I, V, E> createVertexReader(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException {
+    if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+      InputSplitWithInputFormatIndex split =
+          (InputSplitWithInputFormatIndex) inputSplit;
+      VertexInputFormat<I, V, E> vertexInputFormat =
+          vertexInputFormats.get(split.getInputFormatIndex());
+      return vertexInputFormat.createVertexReader(split.getSplit(), context);
+    } else {
+      throw new IllegalStateException("createVertexReader: Got InputSplit " +
+          "which was not created by this class: " +
+          inputSplit.getClass().getName());
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context,
+      int minSplitCountHint) throws IOException, InterruptedException {
+    return MultiInputUtils.getSplits(context, minSplitCountHint,
+        vertexInputFormats);
+  }
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    MultiInputUtils.writeInputSplit(inputSplit, dataOutput, vertexInputFormats);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+      DataInput dataInput) throws IOException, ClassNotFoundException {
+    return MultiInputUtils.readInputSplit(dataInput, vertexInputFormats);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
new file mode 100644
index 0000000..bdd5a74
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Description of the vertex input format - holds vertex input format class and
+ * all parameters specifically set for that vertex input format.
+ *
+ * Used only with {@link MultiVertexInputFormat}
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class VertexInputFormatDescription<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends InputFormatDescription<VertexInputFormat<I, V, E>> {
+  /**
+   * VertexInputFormats description - JSON array containing a JSON array for
+   * each vertex input. Vertex input JSON arrays contain one or two elements -
+   * first one is the name of vertex input class, and second one is JSON object
+   * with all specific parameters for this vertex input. For example:
+   * [["VIF1",{"p":"v1"}],["VIF2",{"p":"v2","q":"v"}]]
+   */
+  public static final StrConfOption VERTEX_INPUT_FORMAT_DESCRIPTIONS =
+      new StrConfOption("giraph.multiVertexInput.descriptions", null);
+
+  /**
+   * Constructor with vertex input format class
+   *
+   * @param vertexInputFormatClass Vertex input format class
+   */
+  public VertexInputFormatDescription(
+      Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass) {
+    super(vertexInputFormatClass);
+  }
+
+  /**
+   * Constructor with json string describing this input format
+   *
+   * @param description Json string describing this input format
+   */
+  public VertexInputFormatDescription(String description) {
+    super(description);
+  }
+
+  /**
+   * Create a copy of configuration which additionally has all parameters for
+   * this input format set
+   *
+   * @param conf Configuration which we want to create a copy from
+   * @return Copy of configuration
+   */
+  private ImmutableClassesGiraphConfiguration<I, V, E, Writable>
+  createConfigurationCopy(
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
+        new ImmutableClassesGiraphConfiguration<I, V, E, Writable>(conf);
+    confCopy.setVertexInputFormatClass(getInputFormatClass());
+    putParametersToConfiguration(confCopy);
+    return confCopy;
+  }
+
+  /**
+   * Get descriptions of vertex input formats from configuration.
+   *
+   * @param conf Configuration
+   * @param <I>  Vertex id
+   * @param <V>  Vertex data
+   * @param <E>  Edge data
+   * @return List of vertex input format descriptions
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable>
+  List<VertexInputFormatDescription<I, V, E>> getVertexInputFormatDescriptions(
+      Configuration conf) {
+    String vertexInputFormatDescriptions =
+        VERTEX_INPUT_FORMAT_DESCRIPTIONS.get(conf);
+    if (vertexInputFormatDescriptions == null) {
+      return Lists.newArrayList();
+    }
+    try {
+      JSONArray inputFormatsJson = new JSONArray(vertexInputFormatDescriptions);
+      List<VertexInputFormatDescription<I, V, E>> descriptions =
+          Lists.newArrayListWithCapacity(inputFormatsJson.length());
+      for (int i = 0; i < inputFormatsJson.length(); i++) {
+        descriptions.add(new VertexInputFormatDescription<I, V, E>(
+            inputFormatsJson.getString(i)));
+      }
+      return descriptions;
+    } catch (JSONException e) {
+      throw new IllegalStateException("getVertexInputFormatDescriptions: " +
+          "JSONException occurred while trying to process " +
+          vertexInputFormatDescriptions, e);
+    }
+  }
+
+  /**
+   * Create all vertex input formats
+   *
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @return List with all vertex input formats
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable>
+  List<VertexInputFormat<I, V, E>> createVertexInputFormats(
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    List<VertexInputFormatDescription<I, V, E>> descriptions =
+        getVertexInputFormatDescriptions(conf);
+    List<VertexInputFormat<I, V, E>> vertexInputFormats =
+        Lists.newArrayListWithCapacity(descriptions.size());
+    for (VertexInputFormatDescription<I, V, E> description : descriptions) {
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
+          description.createConfigurationCopy(conf);
+      vertexInputFormats.add(confCopy.createWrappedVertexInputFormat());
+    }
+    return vertexInputFormats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java
new file mode 100644
index 0000000..2611e9c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for Input formats which wrap several input formats and allow
+ * reading data from multiple sources.
+ */
+package org.apache.giraph.io.formats.multi;

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
index 9b14727..928b975 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
@@ -74,6 +76,7 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
       @Override
       public void setConf(
           ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+        WrappedEdgeInputFormat.this.getConf().updateConfiguration(conf);
         super.setConf(conf);
         edgeReader.setConf(conf);
       }
@@ -113,4 +116,16 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
       }
     };
   }
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    originalInputFormat.writeInputSplit(inputSplit, dataOutput);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+      DataInput dataInput) throws IOException, ClassNotFoundException {
+    return originalInputFormat.readInputSplit(dataInput);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
index 4f0cfea..ed606e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
@@ -76,6 +78,7 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
       @Override
       public void setConf(
           ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+        WrappedVertexInputFormat.this.getConf().updateConfiguration(conf);
         super.setConf(conf);
         vertexReader.setConf(conf);
       }
@@ -110,4 +113,16 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
       }
     };
   }
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+      DataOutput dataOutput) throws IOException {
+    originalInputFormat.writeInputSplit(inputSplit, dataOutput);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+      DataInput dataInput) throws IOException, ClassNotFoundException {
+    return originalInputFormat.readInputSplit(dataInput);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 2c1a679..f00116a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -654,7 +654,8 @@ public class BspServiceMaster<I extends WritableComparable,
     for (int i = 0; i < splitList.size(); ++i) {
       InputSplit inputSplit = splitList.get(i);
       taskExecutor.submit(new LogStacktraceCallable<Void>(
-          new WriteInputSplit(inputSplit, inputSplitsPath, i, writeLocations)));
+          new WriteInputSplit(inputFormat, inputSplit, inputSplitsPath, i,
+              writeLocations)));
     }
     taskExecutor.shutdown();
     ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
@@ -1872,6 +1873,8 @@ public class BspServiceMaster<I extends WritableComparable,
    * Upon failure call() throws an exception.
    */
   private class WriteInputSplit implements Callable<Void> {
+    /** Input format */
+    private final GiraphInputFormat inputFormat;
     /** Input split which we are going to write */
     private final InputSplit inputSplit;
     /** Input splits path */
@@ -1884,6 +1887,7 @@ public class BspServiceMaster<I extends WritableComparable,
     /**
      * Constructor
      *
+     * @param inputFormat Input format
      * @param inputSplit Input split which we are going to write
      * @param inputSplitsPath Input splits path
      * @param index Index of the input split
@@ -1891,10 +1895,12 @@ public class BspServiceMaster<I extends WritableComparable,
      *                       be used by workers for prioritizing local splits
      *                       when reading)
      */
-    public WriteInputSplit(InputSplit inputSplit,
+    public WriteInputSplit(GiraphInputFormat inputFormat,
+                           InputSplit inputSplit,
                            String inputSplitsPath,
                            int index,
                            boolean writeLocations) {
+      this.inputFormat = inputFormat;
       this.inputSplit = inputSplit;
       this.inputSplitsPath = inputSplitsPath;
       this.index = index;
@@ -1926,9 +1932,7 @@ public class BspServiceMaster<I extends WritableComparable,
               locations == null ? "" : locations.toString());
         }
 
-        Text.writeString(outputStream,
-            inputSplit.getClass().getName());
-        ((Writable) inputSplit).write(outputStream);
+        inputFormat.writeInputSplit(inputSplit, outputStream);
         inputSplitPath = inputSplitsPath + "/" + index;
         getZkExt().createExt(inputSplitPath,
             byteArrayOutputStream.toByteArray(),

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 01937ab..51edbac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -313,6 +313,7 @@ public class BspServiceWorker<I extends WritableComparable,
 
     VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
         new VertexInputSplitsCallableFactory<I, V, E, M>(
+            getConfiguration().createWrappedVertexInputFormat(),
             getContext(),
             graphState,
             getConfiguration(),
@@ -350,6 +351,7 @@ public class BspServiceWorker<I extends WritableComparable,
 
     EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
         new EdgeInputSplitsCallableFactory<I, V, E, M>(
+            getConfiguration().createWrappedEdgeInputFormat(),
             getContext(),
             graphState,
             getConfiguration(),

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index de1e774..83fe5ea 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -57,6 +57,8 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(
       EdgeInputSplitsCallable.class);
+  /** Edge input format */
+  private final EdgeInputFormat<I, E> edgeInputFormat;
   /** Input split max edges (-1 denotes all) */
   private final long inputSplitMaxEdges;
 
@@ -67,6 +69,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param edgeInputFormat Edge input format
    * @param context Context
    * @param graphState Graph state
    * @param configuration Configuration
@@ -75,6 +78,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
    * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public EdgeInputSplitsCallable(
+      EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
@@ -83,6 +87,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       ZooKeeperExt zooKeeperExt)  {
     super(context, graphState, configuration, bspServiceWorker,
         splitsHandler, zooKeeperExt);
+    this.edgeInputFormat = edgeInputFormat;
 
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
 
@@ -90,6 +95,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     totalEdgesMeter = getTotalEdgesLoadedMeter();
   }
 
+  @Override
+  public EdgeInputFormat<I, E> getInputFormat() {
+    return edgeInputFormat;
+  }
+
   /**
    * Read edges from input split.  If testing, the user may request a
    * maximum number of edges to be read from an input split.
@@ -105,8 +115,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       InputSplit inputSplit,
       GraphState<I, V, E, M> graphState) throws IOException,
       InterruptedException {
-    EdgeInputFormat<I, E> edgeInputFormat =
-        configuration.createWrappedEdgeInputFormat();
     EdgeReader<I, E> edgeReader =
         edgeInputFormat.createEdgeReader(inputSplit, context);
     edgeReader.setConf(

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 4a1705b..33fb515 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
@@ -38,6 +39,8 @@ import org.apache.hadoop.mapreduce.Mapper;
 public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements CallableFactory<VertexEdgeCount> {
+  /** Edge input format */
+  private final EdgeInputFormat<I, E> edgeInputFormat;
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Graph state. */
@@ -54,6 +57,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param edgeInputFormat Edge input format
    * @param context Mapper context
    * @param graphState Graph state
    * @param configuration Configuration
@@ -62,12 +66,14 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
    * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public EdgeInputSplitsCallableFactory(
+      EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
       InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt) {
+    this.edgeInputFormat = edgeInputFormat;
     this.context = context;
     this.graphState = graphState;
     this.configuration = configuration;
@@ -79,6 +85,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
   @Override
   public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
     return new EdgeInputSplitsCallable<I, V, E, M>(
+        edgeInputFormat,
         context,
         graphState,
         configuration,

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index a3a9ab7..f7a8340 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -23,6 +23,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MeterDesc;
 import org.apache.giraph.time.SystemTime;
@@ -34,7 +35,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 
@@ -118,6 +118,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   // CHECKSTYLE: resume ParameterNumberCheck
 
   /**
+   * Get input format
+   *
+   * @return Input format
+   */
+  public abstract GiraphInputFormat getInputFormat();
+
+  /**
    * Get Meter tracking edges loaded
    *
    * @return Meter tracking edges loaded
@@ -255,12 +262,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
     if (useLocality) {
       Text.readString(inputStream); // location data unused here, skip
     }
-    String inputSplitClass = Text.readString(inputStream);
-    InputSplit inputSplit = (InputSplit)
-        ReflectionUtils.newInstance(
-            configuration.getClassByName(inputSplitClass),
-            configuration);
-    ((Writable) inputSplit).readFields(inputStream);
+    InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
 
     if (LOG.isInfoEnabled()) {
       LOG.info("getInputSplit: Reserved " + inputSplitPath +

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 224856e..d32ccaf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.partition.PartitionOwner;
@@ -57,6 +58,8 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(VertexInputSplitsCallable.class);
+  /** Vertex input format */
+  private final VertexInputFormat<I, V, E> vertexInputFormat;
   /** Input split max vertices (-1 denotes all) */
   private final long inputSplitMaxVertices;
   /** Bsp service worker (only use thread-safe methods) */
@@ -71,6 +74,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param vertexInputFormat Vertex input format
    * @param context Context
    * @param graphState Graph state
    * @param configuration Configuration
@@ -79,6 +83,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
    * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public VertexInputSplitsCallable(
+      VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
@@ -87,6 +92,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
       ZooKeeperExt zooKeeperExt)  {
     super(context, graphState, configuration, bspServiceWorker,
         splitsHandler, zooKeeperExt);
+    this.vertexInputFormat = vertexInputFormat;
 
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
     this.bspServiceWorker = bspServiceWorker;
@@ -96,6 +102,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     totalEdgesMeter = getTotalEdgesLoadedMeter();
   }
 
+  @Override
+  public GiraphInputFormat getInputFormat() {
+    return vertexInputFormat;
+  }
+
   /**
    * Read vertices from input split.  If testing, the user may request a
    * maximum number of vertices to be read from an input split.
@@ -111,8 +122,6 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
       InputSplit inputSplit,
       GraphState<I, V, E, M> graphState)
     throws IOException, InterruptedException {
-    VertexInputFormat<I, V, E> vertexInputFormat =
-        configuration.createWrappedVertexInputFormat();
     VertexReader<I, V, E> vertexReader =
         vertexInputFormat.createVertexReader(inputSplit, context);
     vertexReader.setConf(

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index 4eff3b8..cf5e8ad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
@@ -38,6 +39,8 @@ import org.apache.hadoop.mapreduce.Mapper;
 public class VertexInputSplitsCallableFactory<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements CallableFactory<VertexEdgeCount> {
+  /** Vertex input format */
+  private final VertexInputFormat<I, V, E> vertexInputFormat;
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Graph state. */
@@ -54,6 +57,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param vertexInputFormat Vertex input format
    * @param context Mapper context
    * @param graphState Graph state
    * @param configuration Configuration
@@ -62,12 +66,14 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
    * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public VertexInputSplitsCallableFactory(
+      VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
       InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt) {
+    this.vertexInputFormat = vertexInputFormat;
     this.context = context;
     this.graphState = graphState;
     this.configuration = configuration;
@@ -79,6 +85,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
   @Override
   public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
     return new VertexInputSplitsCallable<I, V, E, M>(
+        vertexInputFormat,
         context,
         graphState,
         configuration,