You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/04 00:28:26 UTC
[1/2] GIRAPH-639: Add support for multiple Vertex/Edge inputs
(majakabiljo)
Updated Branches:
refs/heads/trunk ff970f2db -> c5a87d161
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 8d67c1d..98be881 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
import org.apache.giraph.hive.input.edge.HiveToEdge;
@@ -33,14 +34,20 @@ import org.apache.giraph.hive.input.vertex.HiveToVertex;
import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
import org.apache.giraph.hive.output.HiveVertexOutputFormat;
import org.apache.giraph.hive.output.VertexToHive;
+import org.apache.giraph.io.formats.multi.EdgeInputFormatDescription;
+import org.apache.giraph.io.formats.multi.InputFormatDescription;
+import org.apache.giraph.io.formats.multi.MultiEdgeInputFormat;
+import org.apache.giraph.io.formats.multi.MultiVertexInputFormat;
+import org.apache.giraph.io.formats.multi.VertexInputFormatDescription;
import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.File;
@@ -81,11 +88,13 @@ public class HiveGiraphRunner implements Tool {
/** vertex class. */
private Class<? extends Vertex> vertexClass;
- /** Vertex creator from hive records. */
- private Class<? extends HiveToVertex> hiveToVertexClass;
+ /** Descriptions of vertex input formats */
+ private List<VertexInputFormatDescription> vertexInputDescriptions =
+ Lists.newArrayList();
- /** Edge creator from hive records. */
- private Class<? extends HiveToEdge> hiveToEdgeClass;
+ /** Descriptions of edge input formats */
+ private List<EdgeInputFormatDescription> edgeInputDescriptions =
+ Lists.newArrayList();
/** Hive Vertex writer */
private Class<? extends VertexToHive> vertexToHiveClass;
@@ -108,51 +117,100 @@ public class HiveGiraphRunner implements Tool {
this.vertexClass = vertexClass;
}
- public Class<? extends HiveToVertex> getHiveToVertexClass() {
- return hiveToVertexClass;
+ public List<VertexInputFormatDescription> getVertexInputDescriptions() {
+ return vertexInputDescriptions;
}
/**
- * Set HiveToVertex used with HiveVertexInputFormat
+ * Whether to use vertex input.
*
- * @param hiveToVertexClass HiveToVertex
+ * @return true if vertex input enabled (at least one HiveToVertex is set).
*/
- public void setHiveToVertexClass(
- Class<? extends HiveToVertex> hiveToVertexClass) {
- this.hiveToVertexClass = hiveToVertexClass;
- HIVE_TO_VERTEX_CLASS.set(conf, hiveToVertexClass);
+ public boolean hasVertexInput() {
+ return !vertexInputDescriptions.isEmpty();
}
/**
- * Whether to use vertex input.
+ * Add vertex input
*
- * @return true if vertex input enabled (HiveToVertex is set).
+ * @param hiveToVertexClass HiveToVertex class to use
+ * @param tableName Table name
+ * @param partitionFilter Partition filter, or null if no filter used
+ * @param additionalOptions Additional options, in the form "option=value"
*/
- public boolean hasVertexValueInput() {
- return hiveToVertexClass != null;
+ public void addVertexInput(Class<? extends HiveToVertex> hiveToVertexClass,
+ String tableName, String partitionFilter, String ... additionalOptions) {
+ VertexInputFormatDescription description =
+ new VertexInputFormatDescription(HiveVertexInputFormat.class);
+ description.addParameter(
+ HIVE_TO_VERTEX_CLASS.getKey(), hiveToVertexClass.getName());
+ description.addParameter(HIVE_VERTEX_INPUT_PROFILE_ID.getKey(),
+ "vertex_input_profile_" + vertexInputDescriptions.size());
+ description.addParameter(
+ HIVE_VERTEX_INPUT_TABLE.getKey(), tableName);
+ if (partitionFilter != null && !partitionFilter.isEmpty()) {
+ description.addParameter(
+ HIVE_VERTEX_INPUT_PARTITION.getKey(), partitionFilter);
+ }
+ addAdditionalOptions(description, additionalOptions);
+ vertexInputDescriptions.add(description);
}
- public Class<? extends HiveToEdge> getHiveToEdgeClass() {
- return hiveToEdgeClass;
+ public List<EdgeInputFormatDescription> getEdgeInputDescriptions() {
+ return edgeInputDescriptions;
}
/**
* Whether to use edge input.
*
- * @return true if edge input enabled (HiveToEdge is set).
+ * @return true if edge input enabled (at least one HiveToEdge is set).
*/
public boolean hasEdgeInput() {
- return hiveToEdgeClass != null;
+ return !edgeInputDescriptions.isEmpty();
+ }
+
+ /**
+ * Add edge input
+ *
+ * @param hiveToEdgeClass HiveToEdge class to use
+ * @param tableName Table name
+ * @param partitionFilter Partition filter, or null if no filter used
+ * @param additionalOptions Additional options, in the form "option=value"
+ */
+ public void addEdgeInput(Class<? extends HiveToEdge> hiveToEdgeClass,
+ String tableName, String partitionFilter, String ... additionalOptions) {
+ EdgeInputFormatDescription description =
+ new EdgeInputFormatDescription(HiveEdgeInputFormat.class);
+ description.addParameter(
+ HIVE_TO_EDGE_CLASS.getKey(), hiveToEdgeClass.getName());
+ description.addParameter(HIVE_EDGE_INPUT_PROFILE_ID.getKey(),
+ "edge_input_profile_" + edgeInputDescriptions.size());
+ description.addParameter(
+ HIVE_EDGE_INPUT_TABLE.getKey(), tableName);
+ if (partitionFilter != null && !partitionFilter.isEmpty()) {
+ description.addParameter(
+ HIVE_EDGE_INPUT_PARTITION.getKey(), partitionFilter);
+ }
+ addAdditionalOptions(description, additionalOptions);
+ edgeInputDescriptions.add(description);
}
/**
- * Set HiveToEdge used with HiveEdgeInputFormat
+ * Add additional options to InputFormatDescription
*
- * @param hiveToEdgeClass HiveToEdge
+ * @param description InputFormatDescription
+ * @param additionalOptions Additional options
*/
- public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) {
- this.hiveToEdgeClass = hiveToEdgeClass;
- HIVE_TO_EDGE_CLASS.set(conf, hiveToEdgeClass);
+ private static void addAdditionalOptions(InputFormatDescription description,
+ String ... additionalOptions) {
+ for (String additionalOption : additionalOptions) {
+ String[] nameValue = split(additionalOption, "=");
+ if (nameValue.length != 2) {
+ throw new IllegalStateException("Invalid additional option format " +
+ additionalOption + ", 'name=value' format expected");
+ }
+ description.addParameter(nameValue[0], nameValue[1]);
+ }
}
public Class<? extends VertexToHive> getVertexToHiveClass() {
@@ -169,14 +227,22 @@ public class HiveGiraphRunner implements Tool {
}
/**
- * Set class used to write vertices to Hive.
+ * Set vertex output
*
* @param vertexToHiveClass class for writing vertices to Hive.
+ * @param tableName Table name
+ * @param partitionFilter Partition filter, or null if no filter used
*/
- public void setVertexToHiveClass(
- Class<? extends VertexToHive> vertexToHiveClass) {
+ public void setVertexOutput(
+ Class<? extends VertexToHive> vertexToHiveClass, String tableName,
+ String partitionFilter) {
this.vertexToHiveClass = vertexToHiveClass;
VERTEX_TO_HIVE_CLASS.set(conf, vertexToHiveClass);
+ HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile");
+ HIVE_VERTEX_OUTPUT_TABLE.set(conf, tableName);
+ if (partitionFilter != null) {
+ HIVE_VERTEX_OUTPUT_PARTITION.set(conf, partitionFilter);
+ }
}
/**
@@ -209,9 +275,6 @@ public class HiveGiraphRunner implements Tool {
GiraphConfiguration giraphConf = job.getConfiguration();
giraphConf.setVertexClass(vertexClass);
- setupHiveInputs(giraphConf);
- setupHiveOutput(giraphConf);
-
giraphConf.setWorkerConfiguration(workers, workers, 100.0f);
initGiraphJob(job);
@@ -221,42 +284,48 @@ public class HiveGiraphRunner implements Tool {
}
/**
- * Initialize hive input settings
- *
- * @param conf Configuration to write to
- * @throws TException thrift problem
+ * Prepare vertex input settings in Configuration
*/
- private void setupHiveInputs(GiraphConfiguration conf) throws TException {
- if (hiveToVertexClass != null) {
- conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
- HIVE_VERTEX_INPUT_PROFILE_ID.set(conf, "vertex_input_profile");
- }
-
- if (hiveToEdgeClass != null) {
- conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
- HIVE_EDGE_INPUT_PROFILE_ID.set(conf, "edge_input_profile");
+ @SuppressWarnings("unchecked")
+ public void prepareHiveVertexInputs() {
+ if (vertexInputDescriptions.size() == 1) {
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+ vertexInputDescriptions.get(0).getInputFormatClass());
+ vertexInputDescriptions.get(0).putParametersToConfiguration(conf);
+ } else {
+ GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+ MultiVertexInputFormat.class);
+ VertexInputFormatDescription.VERTEX_INPUT_FORMAT_DESCRIPTIONS.set(conf,
+ InputFormatDescription.toJsonString(vertexInputDescriptions));
}
}
/**
- * Initialize hive output settings
- *
- * @param conf Configuration to write to
- * @throws TException thrift problem
+ * Prepare edge input settings in Configuration
*/
- private void setupHiveOutput(GiraphConfiguration conf) throws TException {
- if (skipOutput) {
- LOG.warn("run: Warning - Output will be skipped!");
- } else if (vertexToHiveClass != null) {
- conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class);
- HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile");
+ @SuppressWarnings("unchecked")
+ public void prepareHiveEdgeInputs() {
+ if (edgeInputDescriptions.size() == 1) {
+ GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf,
+ edgeInputDescriptions.get(0).getInputFormatClass());
+ edgeInputDescriptions.get(0).putParametersToConfiguration(conf);
} else {
- LOG.fatal("output requested but " + VertexToHive.class.getSimpleName() +
- " not set");
+ GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf,
+ MultiEdgeInputFormat.class);
+ EdgeInputFormatDescription.EDGE_INPUT_FORMAT_DESCRIPTIONS.set(conf,
+ InputFormatDescription.toJsonString(edgeInputDescriptions));
}
}
/**
+ * Prepare output settings in Configuration
+ */
+ public void prepareHiveOutput() {
+ GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.set(conf,
+ HiveVertexOutputFormat.class);
+ }
+
+ /**
* set hive configuration
*/
private void adjustConfigurationForHive() {
@@ -312,47 +381,65 @@ public class HiveGiraphRunner implements Tool {
" class name (-vertexClass) to use");
}
- String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass");
- if (hiveToVertexClassStr != null) {
- if (hiveToVertexClassStr.equals("disable")) {
- hiveToVertexClass = null;
- } else {
- setHiveToVertexClass(
- findClass(hiveToVertexClassStr, HiveToVertex.class));
+ String[] vertexInputs = cmdln.getOptionValues("vertexInput");
+ if (vertexInputs != null && vertexInputs.length != 0) {
+ vertexInputDescriptions.clear();
+ for (String vertexInput : vertexInputs) {
+ String[] parameters = split(vertexInput, ",");
+ if (parameters.length < 2) {
+ throw new IllegalStateException("Illegal vertex input description " +
+ vertexInput + " - HiveToVertex class and table name needed");
+ }
+ addVertexInput(findClass(parameters[0], HiveToVertex.class),
+ parameters[1], elementOrNull(parameters, 2),
+ copyOfArray(parameters, 3));
}
}
- String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass");
- if (hiveToEdgeClassStr != null) {
- if (hiveToEdgeClassStr.equals("disable")) {
- hiveToEdgeClass = null;
- } else {
- setHiveToEdgeClass(
- findClass(hiveToEdgeClassStr, HiveToEdge.class));
+ String[] edgeInputs = cmdln.getOptionValues("edgeInput");
+ if (edgeInputs != null && edgeInputs.length != 0) {
+ edgeInputDescriptions.clear();
+ for (String edgeInput : edgeInputs) {
+ String[] parameters = split(edgeInput, ",");
+ if (parameters.length < 2) {
+ throw new IllegalStateException("Illegal edge input description " +
+ edgeInput + " - HiveToEdge class and table name needed");
+ }
+ addEdgeInput(findClass(parameters[0], HiveToEdge.class),
+ parameters[1], elementOrNull(parameters, 2),
+ copyOfArray(parameters, 3));
}
}
- String vertexToHiveClassStr = cmdln.getOptionValue("vertexToHiveClass");
- if (vertexToHiveClassStr != null) {
- setVertexToHiveClass(findClass(vertexToHiveClassStr, VertexToHive.class));
+ String output = cmdln.getOptionValue("output");
+ if (output != null) {
+ // Partition filter can contain commas so we limit the number of times
+ // we split
+ String[] parameters = split(output, ",", 3);
+ if (parameters.length < 2) {
+ throw new IllegalStateException("Illegal output description " +
+ output + " - VertexToHive class and table name needed");
+ }
+ setVertexOutput(findClass(parameters[0], VertexToHive.class),
+ parameters[1], elementOrNull(parameters, 2));
}
if (cmdln.hasOption("skipOutput")) {
skipOutput = true;
}
- if (hiveToVertexClass == null && hiveToEdgeClass == null) {
+ if (!hasVertexInput() && !hasEdgeInput()) {
throw new IllegalArgumentException(
"Need at least one of Giraph " +
HiveToVertex.class.getSimpleName() +
- " class name (-hiveToVertexClass) and " +
+ " (-vertexInput) and " +
HiveToEdge.class.getSimpleName() +
- " class name (-hiveToEdgeClass)");
+ " (-edgeInput)");
}
if (vertexToHiveClass == null && !skipOutput) {
throw new IllegalArgumentException(
"Need the Giraph " + VertexToHive.class.getSimpleName() +
- " class name (-vertexToHiveClass) to use");
+ " (-output) to use");
}
String workersStr = cmdln.getOptionValue("workers");
if (workersStr == null) {
@@ -360,40 +447,24 @@ public class HiveGiraphRunner implements Tool {
"Need to choose the number of workers (-w)");
}
- String vertexInputTableStr = cmdln.getOptionValue("vertexInputTable");
- if (vertexInputTableStr == null && hiveToVertexClass != null) {
- throw new IllegalArgumentException(
- "Need to set the vertex input table name (-vi)");
- }
+ String dbName = cmdln.getOptionValue("dbName", "default");
- String edgeInputTableStr = cmdln.getOptionValue("edgeInputTable");
- if (edgeInputTableStr == null && hiveToEdgeClass != null) {
- throw new IllegalArgumentException(
- "Need to set the edge input table name (-ei)");
+ if (hasVertexInput()) {
+ HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName);
+ prepareHiveVertexInputs();
}
- String outputTableStr = cmdln.getOptionValue("outputTable");
- if (outputTableStr == null) {
- throw new IllegalArgumentException(
- "Need to set the output table name (-o)");
+ if (hasEdgeInput()) {
+ HIVE_EDGE_INPUT_DATABASE.set(conf, dbName);
+ prepareHiveEdgeInputs();
}
- String dbName = cmdln.getOptionValue("dbName", "default");
-
- HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName);
- HIVE_VERTEX_INPUT_TABLE.set(conf, vertexInputTableStr);
- HIVE_VERTEX_INPUT_PARTITION.set(conf,
- cmdln.getOptionValue("vertexInputFilter"));
-
- HIVE_EDGE_INPUT_DATABASE.set(conf, dbName);
- HIVE_EDGE_INPUT_TABLE.set(conf, edgeInputTableStr);
- HIVE_EDGE_INPUT_PARTITION.set(conf,
- cmdln.getOptionValue("edgeInputFilter"));
-
- HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName);
- HIVE_VERTEX_OUTPUT_TABLE.set(conf, cmdln.getOptionValue("outputTable"));
- HIVE_VERTEX_OUTPUT_PARTITION.set(conf,
- cmdln.getOptionValue("outputPartition"));
+ if (!skipOutput) {
+ HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName);
+ prepareHiveOutput();
+ } else {
+ LOG.warn("run: Warning - Output will be skipped!");
+ }
workers = Integer.parseInt(workersStr);
@@ -447,42 +518,44 @@ public class HiveGiraphRunner implements Tool {
options.addOption("db", "dbName", true, "Hive database name");
// Vertex input settings
- options.addOption(null, "hiveToVertexClass", true,
- "Giraph " + HiveToVertex.class.getSimpleName() +
- " class to use (default - " +
- (hiveToVertexClass == null ? "not used" :
- hiveToVertexClass.getSimpleName()) + "), " +
- "\"disable\" will unset this option");
- options.addOption("vi", "vertexInputTable", true,
- "Vertex input table name");
- options.addOption("VI", "vertexInputFilter", true,
- "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
+ options.addOption("vi", "vertexInput", true, getInputOptionDescription(
+ "vertex", HiveToVertex.class.getSimpleName()));
// Edge input settings
- options.addOption(null, "hiveToEdgeClass", true,
- "Giraph " + HiveToEdge.class.getSimpleName() +
- " class to use (default - " +
- (hiveToEdgeClass == null ? "not used" :
- hiveToEdgeClass.getSimpleName()) + "), " +
- "\"disable\" will unset this option");
- options.addOption("ei", "edgeInputTable", true,
- "Edge input table name");
- options.addOption("EI", "edgeInputFilter", true,
- "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
+ options.addOption("ei", "edgeInput", true, getInputOptionDescription(
+ "edge", HiveToEdge.class.getSimpleName()));
// Vertex output settings
- if (vertexToHiveClass == null) {
- options.addOption(null, "vertexToHiveClass", true,
- "Giraph " + VertexToHive.class.getSimpleName() + " class to use");
- }
-
- options.addOption("o", "outputTable", true, "Output table name");
- options.addOption("O", "outputPartition", true,
- "Output table partition values (e.g., \"a=1,b=two\")");
+ options.addOption("o", "output", true,
+ "Giraph " + VertexToHive.class.getSimpleName() + " class to use," +
+ " table name and partition filter (optional). Example:\n" +
+ "\"MyVertexToHive, myTableName, a=1,b=two\"");
options.addOption("s", "skipOutput", false, "Skip output?");
}
/**
+ * Get description for the input format option (vertex or edge).
+ *
+ * @param inputType Type of input (vertex or edge)
+ * @param hiveToObjectClassName HiveToVertex or HiveToEdge
+ * @return Description for the input format option
+ */
+ private static String getInputOptionDescription(String inputType,
+ String hiveToObjectClassName) {
+ StringBuilder inputOption = new StringBuilder();
+ inputOption.append("Giraph ").append(hiveToObjectClassName).append(
+ " class to use, table name and partition filter (optional).");
+ inputOption.append(" Additional options for the input format can be " +
+ "specified as well.");
+ inputOption.append(" You can set as many ").append(inputType).append(
+ " inputs as you like.");
+ inputOption.append(" Example:\n");
+ inputOption.append("\"My").append(hiveToObjectClassName).append(
+ ", myTableName, a<2 AND b='two', option1=value1, option2=value2\"");
+ return inputOption.toString();
+ }
+
+ /**
* add string to collection
* @param conf Configuration
* @param name name to add
@@ -533,7 +606,7 @@ public class HiveGiraphRunner implements Tool {
@Override
public final void setConf(Configuration conf) {
- this.conf = conf;
+ this.conf = new GiraphConfiguration(conf);
}
/**
@@ -569,35 +642,83 @@ public class HiveGiraphRunner implements Tool {
* @param giraphConf GiraphConfiguration
*/
private void logOptions(GiraphConfiguration giraphConf) {
- GiraphClasses classes = new GiraphClasses(giraphConf);
+ GiraphClasses<?, ?, ?, ?> classes = new GiraphClasses(giraphConf);
LOG.info(getClass().getSimpleName() + " with");
LOG.info(LOG_PREFIX + "-vertexClass=" + vertexClass.getCanonicalName());
- if (hiveToVertexClass != null) {
- LOG.info(LOG_PREFIX + "-hiveToVertexClass=" +
- hiveToVertexClass.getCanonicalName());
- }
- if (classes.getVertexInputFormatClass() != null) {
- LOG.info(LOG_PREFIX + "-vertexInputFormatClass=" +
- classes.getVertexInputFormatClass().getCanonicalName());
+ for (VertexInputFormatDescription description : vertexInputDescriptions) {
+ LOG.info(LOG_PREFIX + "Vertex input: " + description);
}
- if (hiveToEdgeClass != null) {
- LOG.info(LOG_PREFIX + "-hiveToEdgeClass=" +
- hiveToEdgeClass.getCanonicalName());
- }
- if (classes.getEdgeInputFormatClass() != null) {
- LOG.info(LOG_PREFIX + "-edgeInputFormatClass=" +
- classes.getEdgeInputFormatClass().getCanonicalName());
+ for (EdgeInputFormatDescription description : edgeInputDescriptions) {
+ LOG.info(LOG_PREFIX + "Edge input: " + description);
}
if (classes.getVertexOutputFormatClass() != null) {
- LOG.info(LOG_PREFIX + "-outputFormatClass=" +
- classes.getVertexOutputFormatClass().getCanonicalName());
+ LOG.info(LOG_PREFIX + "Output: VertexToHive=" +
+ vertexToHiveClass.getCanonicalName() + ", table=" +
+ HIVE_VERTEX_OUTPUT_TABLE.get(conf) + ", partition=\"" +
+ HIVE_VERTEX_OUTPUT_PARTITION.get(conf) + "\"");
}
LOG.info(LOG_PREFIX + "-workers=" + workers);
}
+
+ /**
+ * Split a string using separator and trim the results
+ *
+ * @param stringToSplit String to split
+ * @param separator Separator
+ * @return Separated strings, trimmed
+ */
+ private static String[] split(String stringToSplit, String separator) {
+ return split(stringToSplit, separator, -1);
+ }
+
+ /**
+ * Split a string using separator and trim the results
+ *
+ * @param stringToSplit String to split
+ * @param separator Separator
+ * @param limit See {@link String#split(String, int)}
+ * @return Separated strings, trimmed
+ */
+ private static String[] split(String stringToSplit, String separator,
+ int limit) {
+ Splitter splitter = Splitter.on(separator).trimResults();
+ if (limit > 0) {
+ splitter = splitter.limit(limit);
+ }
+ return Iterables.toArray(splitter.split(stringToSplit), String.class);
+ }
+
+ /**
+ * Get the element in array at certain position, or null if the position is
+ * out of array size
+ *
+ * @param array Array
+ * @param position Position
+ * @return Element at the position or null if the position is out of array
+ */
+ private static String elementOrNull(String[] array, int position) {
+ return (position < array.length) ? array[position] : null;
+ }
+
+ /**
+ * Return a copy of array from some position to the end,
+ * or empty array if startIndex is out of array size
+ *
+ * @param array Array to take a copy from
+ * @param startIndex Starting position
+ * @return Copy of part of the array
+ */
+ private static String[] copyOfArray(String[] array, int startIndex) {
+ if (array.length <= startIndex) {
+ return new String[0];
+ } else {
+ return Arrays.copyOfRange(array, startIndex, array.length);
+ }
+ }
}
[2/2] git commit: updated refs/heads/trunk to c5a87d1
Posted by ma...@apache.org.
GIRAPH-639: Add support for multiple Vertex/Edge inputs (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c5a87d16
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c5a87d16
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c5a87d16
Branch: refs/heads/trunk
Commit: c5a87d16119c7e9a9f2af80a5534c4fe37605dd3
Parents: ff970f2
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Fri May 3 15:14:35 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Fri May 3 15:14:35 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../apache/giraph/conf/GiraphConfiguration.java | 10 +-
.../conf/ImmutableClassesGiraphConfiguration.java | 14 +
.../java/org/apache/giraph/io/EdgeInputFormat.java | 6 +-
.../org/apache/giraph/io/GiraphInputFormat.java | 47 ++-
.../org/apache/giraph/io/VertexInputFormat.java | 8 +-
.../formats/multi/EdgeInputFormatDescription.java | 145 +++++
.../io/formats/multi/InputFormatDescription.java | 167 ++++++
.../multi/InputSplitWithInputFormatIndex.java | 67 +++
.../io/formats/multi/MultiEdgeInputFormat.java | 93 ++++
.../giraph/io/formats/multi/MultiInputUtils.java | 103 ++++
.../io/formats/multi/MultiVertexInputFormat.java | 95 ++++
.../multi/VertexInputFormatDescription.java | 151 +++++
.../giraph/io/formats/multi/package-info.java | 22 +
.../giraph/io/internal/WrappedEdgeInputFormat.java | 15 +
.../io/internal/WrappedVertexInputFormat.java | 15 +
.../org/apache/giraph/master/BspServiceMaster.java | 14 +-
.../org/apache/giraph/worker/BspServiceWorker.java | 2 +
.../giraph/worker/EdgeInputSplitsCallable.java | 12 +-
.../worker/EdgeInputSplitsCallableFactory.java | 7 +
.../apache/giraph/worker/InputSplitsCallable.java | 16 +-
.../giraph/worker/VertexInputSplitsCallable.java | 13 +-
.../worker/VertexInputSplitsCallableFactory.java | 7 +
.../org/apache/giraph/hive/HiveGiraphRunner.java | 427 +++++++++-----
24 files changed, 1271 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7c906d2..4d2c90c 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.0.1 - unreleased
+ GIRAPH-639: Add support for multiple Vertex/Edge inputs (majakabiljo)
+
GIRAPH-653: Hadoop_non_secure broken (majakabiljo)
GIRAPH-650: Exception in GiraphConfiguration initialization (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index deb2069..45a29ff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -185,7 +185,7 @@ public class GiraphConfiguration extends Configuration
*
* @param vertexInputFormatClass Determines how graph is input
*/
- public final void setVertexInputFormatClass(
+ public void setVertexInputFormatClass(
Class<? extends VertexInputFormat> vertexInputFormatClass) {
VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
}
@@ -204,7 +204,7 @@ public class GiraphConfiguration extends Configuration
*
* @param edgeInputFormatClass Determines how graph is input
*/
- public final void setEdgeInputFormatClass(
+ public void setEdgeInputFormatClass(
Class<? extends EdgeInputFormat> edgeInputFormatClass) {
EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
}
@@ -955,8 +955,10 @@ public class GiraphConfiguration extends Configuration
* @param conf Configuration
*/
public void updateConfiguration(Configuration conf) {
- for (Map.Entry<String, String> parameter : giraphSetParameters) {
- conf.set(parameter.getKey(), parameter.getValue());
+ if (this != conf) {
+ for (Map.Entry<String, String> parameter : giraphSetParameters) {
+ conf.set(parameter.getKey(), parameter.getValue());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index f5a926f..f992b37 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -203,6 +203,13 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
@Override
+ public void setVertexInputFormatClass(
+ Class<? extends VertexInputFormat> vertexInputFormatClass) {
+ super.setVertexInputFormatClass(vertexInputFormatClass);
+ classes.setVertexInputFormatClass(vertexInputFormatClass);
+ }
+
+ @Override
public boolean hasVertexOutputFormat() {
return classes.hasVertexOutputFormat();
}
@@ -305,6 +312,13 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
return wrappedEdgeInputFormat;
}
+ @Override
+ public void setEdgeInputFormatClass(
+ Class<? extends EdgeInputFormat> edgeInputFormatClass) {
+ super.setEdgeInputFormatClass(edgeInputFormatClass);
+ classes.setEdgeInputFormatClass(edgeInputFormatClass);
+ }
+
/**
* Get the user's subclassed {@link AggregatorWriter}.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
index 2aac1f0..1f4451f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java
@@ -18,7 +18,6 @@
package org.apache.giraph.io;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,10 +40,7 @@ import java.io.IOException;
* @param <E> Edge data
*/
public abstract class EdgeInputFormat<I extends WritableComparable,
- E extends Writable>
- extends
- DefaultImmutableClassesGiraphConfigurable<I, Writable, E, Writable>
- implements GiraphInputFormat {
+ E extends Writable> extends GiraphInputFormat<I, Writable, E> {
/**
* Create an edge reader for a given split. The framework will call
* {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
index 13efc93..58d79a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
@@ -18,15 +18,29 @@
package org.apache.giraph.io;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Common interface for {@link VertexInputFormat} and {@link EdgeInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
*/
-public interface GiraphInputFormat {
+public abstract class GiraphInputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable> extends
+ DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable> {
/**
* Get the list of input splits for the format.
*
@@ -36,6 +50,33 @@ public interface GiraphInputFormat {
* @throws IOException
* @throws InterruptedException
*/
- List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
- throws IOException, InterruptedException;
+ public abstract List<InputSplit> getSplits(JobContext context,
+ int minSplitCountHint) throws IOException, InterruptedException;
+
+ /**
+ * Write input split info to DataOutput.
+ *
+ * @param inputSplit InputSplit
+ * @param dataOutput DataOutput
+ */
+ public void writeInputSplit(InputSplit inputSplit,
+ DataOutput dataOutput) throws IOException {
+ Text.writeString(dataOutput, inputSplit.getClass().getName());
+ ((Writable) inputSplit).write(dataOutput);
+ }
+
+ /**
+ * Read input split info from DataInput.
+ *
+ * @param dataInput DataInput
+ * @return InputSplit
+ */
+ public InputSplit readInputSplit(DataInput dataInput) throws IOException,
+ ClassNotFoundException {
+ String inputSplitClass = Text.readString(dataInput);
+ InputSplit inputSplit = (InputSplit) ReflectionUtils.newInstance(
+ getConf().getClassByName(inputSplitClass), getConf());
+ ((Writable) inputSplit).readFields(dataInput);
+ return inputSplit;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
index c4d7fe2..6c8adc2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
@@ -20,7 +20,6 @@ package org.apache.giraph.io;
import java.io.IOException;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -45,20 +44,17 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
*/
@SuppressWarnings("rawtypes")
public abstract class VertexInputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable>
- implements GiraphInputFormat {
+ V extends Writable, E extends Writable> extends GiraphInputFormat<I, V, E> {
/**
* Create a vertex reader for a given split. Guaranteed to have been
* configured with setConf() prior to use. The framework will also call
- * {@linkVertexReader#initialize(InputSplit, TaskAttemptContext)} before
+ * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
*
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
* @throws IOException
- * @throws InterruptedException
*/
public abstract VertexReader<I, V, E> createVertexReader(
InputSplit split,
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
new file mode 100644
index 0000000..569cee9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Description of the edge input format - holds edge input format class and all
+ * parameters specifically set for that edge input format.
+ *
+ * Used only with {@link MultiEdgeInputFormat}
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class EdgeInputFormatDescription<I extends WritableComparable,
+ E extends Writable> extends InputFormatDescription<EdgeInputFormat<I, E>> {
+ /**
+ * EdgeInputFormats description - JSON array containing a JSON array for
+ * each edge input. Edge input JSON arrays contain one or two elements -
+ * first one is the name of edge input class, and second one is JSON object
+ * with all specific parameters for this edge input. For example:
+ * [["EIF1",{"p":"v1"}],["EIF2",{"p":"v2","q":"v"}]]
+ */
+ public static final StrConfOption EDGE_INPUT_FORMAT_DESCRIPTIONS =
+ new StrConfOption("giraph.multiEdgeInput.descriptions", null);
+
+ /**
+ * Constructor with edge input format class
+ *
+ * @param edgeInputFormatClass Edge input format class
+ */
+ public EdgeInputFormatDescription(
+ Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass) {
+ super(edgeInputFormatClass);
+ }
+
+ /**
+ * Constructor with json string describing this input format
+ *
+ * @param description Json string describing this input format
+ */
+ public EdgeInputFormatDescription(String description) {
+ super(description);
+ }
+
+ /**
+ * Create a copy of configuration which additionally has all parameters for
+ * this input format set
+ *
+ * @param conf Configuration which we want to create a copy from
+ * @return Copy of configuration
+ */
+ private ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>
+ createConfigurationCopy(
+ ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
+ new ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>(conf);
+ confCopy.setEdgeInputFormatClass(getInputFormatClass());
+ putParametersToConfiguration(confCopy);
+ return confCopy;
+ }
+
+ /**
+ * Get descriptions of edge input formats from configuration.
+ *
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ * @return List of edge input format descriptions
+ */
+ public static <I extends WritableComparable, E extends Writable>
+ List<EdgeInputFormatDescription<I, E>> getEdgeInputFormatDescriptions(
+ Configuration conf) {
+ String edgeInputFormatDescriptions =
+ EDGE_INPUT_FORMAT_DESCRIPTIONS.get(conf);
+ if (edgeInputFormatDescriptions == null) {
+ return Lists.newArrayList();
+ }
+ try {
+ JSONArray inputFormatsJson = new JSONArray(edgeInputFormatDescriptions);
+ List<EdgeInputFormatDescription<I, E>> descriptions =
+ Lists.newArrayListWithCapacity(inputFormatsJson.length());
+ for (int i = 0; i < inputFormatsJson.length(); i++) {
+ descriptions.add(new EdgeInputFormatDescription<I, E>(
+ inputFormatsJson.getString(i)));
+ }
+ return descriptions;
+ } catch (JSONException e) {
+ throw new IllegalStateException("getEdgeInputFormatDescriptions: " +
+ "JSONException occurred while trying to process " +
+ edgeInputFormatDescriptions, e);
+ }
+ }
+
+ /**
+ * Create all edge input formats
+ *
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ * @return List with all edge input formats
+ */
+ public static <I extends WritableComparable,
+ E extends Writable> List<EdgeInputFormat<I, E>> createEdgeInputFormats(
+ ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ List<EdgeInputFormatDescription<I, E>> descriptions =
+ getEdgeInputFormatDescriptions(conf);
+ List<EdgeInputFormat<I, E>> edgeInputFormats =
+ Lists.newArrayListWithCapacity(descriptions.size());
+ for (EdgeInputFormatDescription<I, E> description : descriptions) {
+ ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
+ description.createConfigurationCopy(conf);
+ edgeInputFormats.add(confCopy.createWrappedEdgeInputFormat());
+ }
+ return edgeInputFormats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java
new file mode 100644
index 0000000..de34a1f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputFormatDescription.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Description of the input format - holds input format class and all
+ * parameters specifically set for that input format.
+ *
+ * Used only with input formats which wrap several input formats into one
+ * ({@link MultiVertexInputFormat} and {@link MultiEdgeInputFormat})
+ *
+ * @param <IF> Input format type
+ */
+public abstract class InputFormatDescription<IF extends GiraphInputFormat> {
+ /** Input format class */
+ private Class<? extends IF> inputFormatClass;
+ /** Parameters set specifically for this input format */
+ private final Map<String, String> parameters = Maps.newHashMap();
+
+ /**
+ * Constructor with input format class
+ *
+ * @param inputFormatClass Input format class
+ */
+ public InputFormatDescription(Class<? extends IF> inputFormatClass) {
+ this.inputFormatClass = inputFormatClass;
+ }
+
+ /**
+ * Constructor with json string describing this input format
+ *
+ * @param description Json string describing this input format
+ */
+ @SuppressWarnings("unchecked")
+ public InputFormatDescription(String description) {
+ try {
+ JSONArray jsonArray = new JSONArray(description);
+ inputFormatClass =
+ (Class<? extends IF>) Class.forName(jsonArray.getString(0));
+ if (jsonArray.length() > 1) {
+ addParameters(jsonArray.getJSONObject(1));
+ }
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ "Failed to parse JSON " + description, e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Couldn't find " +
+ "input format class from description " + description, e);
+ }
+ }
+
+ /**
+ * Add parameter to this input format description
+ *
+ * @param name Parameter name
+ * @param value Parameter value
+ */
+ public void addParameter(String name, String value) {
+ parameters.put(name, value);
+ }
+
+ /**
+ * Add all parameters from json object
+ *
+ * @param parametersJson Json object to read parameters from
+ */
+ public void addParameters(JSONObject parametersJson) {
+ Iterator<?> keys = parametersJson.keys();
+ while (keys.hasNext()) {
+ String key = (String) keys.next();
+ try {
+ addParameter(key, parametersJson.getString(key));
+ } catch (JSONException e) {
+ throw new IllegalStateException("addParameters: Failed to parse " +
+ parametersJson, e);
+ }
+ }
+ }
+
+ /**
+ * Convert input format description to json array
+ *
+ * @return Json array representing this input format description
+ */
+ public JSONArray toJsonArray() {
+ JSONArray jsonArray = new JSONArray();
+ jsonArray.put(inputFormatClass.getName());
+ JSONObject jsonParameters = new JSONObject();
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ try {
+ jsonParameters.put(entry.getKey(), entry.getValue());
+ } catch (JSONException e) {
+ throw new IllegalStateException("toJsonArray: JSONException occurred " +
+ "while trying to process (" + entry.getKey() + ", " +
+ entry.getValue() + ")", e);
+ }
+ }
+ jsonArray.put(jsonParameters);
+ return jsonArray;
+ }
+
+ public Class<? extends IF> getInputFormatClass() {
+ return inputFormatClass;
+ }
+
+ public void setInputFormatClass(Class<? extends IF> inputFormatClass) {
+ this.inputFormatClass = inputFormatClass;
+ }
+
+ /**
+ * Put parameters from this input format description to configuration
+ *
+ * @param conf Configuration to put parameters to
+ */
+ public void putParametersToConfiguration(Configuration conf) {
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toJsonArray().toString();
+ }
+
+ /**
+ * Create JSON string for the InputFormatDescriptions
+ *
+ * @param descriptions InputFormatDescriptions
+ * @return JSON string describing these InputFormatDescriptions
+ */
+ public static String toJsonString(
+ Iterable<? extends InputFormatDescription> descriptions) {
+ JSONArray jsonArray = new JSONArray();
+ for (InputFormatDescription description : descriptions) {
+ jsonArray.put(description.toJsonArray());
+ }
+ return jsonArray.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java
new file mode 100644
index 0000000..c6bba29
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/InputSplitWithInputFormatIndex.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+
+/**
+ * Helper InputSplit class which holds the information about input format
+ * which owns this input split.
+ *
+ * Used only with input formats which wrap several input formats into one
+ * ({@link MultiVertexInputFormat} and {@link MultiEdgeInputFormat})
+ */
+class InputSplitWithInputFormatIndex extends InputSplit {
+ /** Wrapped input split */
+ private InputSplit split;
+ /** Index of input format which owns the input split */
+ private int inputFormatIndex;
+
+ /**
+ * Constructor
+ *
+ * @param split Input split
+ * @param inputFormatIndex Index of input format which owns the input split
+ */
+ InputSplitWithInputFormatIndex(InputSplit split, int inputFormatIndex) {
+ this.inputFormatIndex = inputFormatIndex;
+ this.split = split;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return split.getLength();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return split.getLocations();
+ }
+
+ int getInputFormatIndex() {
+ return inputFormatIndex;
+ }
+
+ InputSplit getSplit() {
+ return split;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
new file mode 100644
index 0000000..113b3bc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Edge input format which wraps several edge input formats.
+ * Provides the way to read data from multiple sources,
+ * using several different input formats.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class MultiEdgeInputFormat<I extends WritableComparable,
+ E extends Writable> extends EdgeInputFormat<I, E> {
+ /** Edge input formats */
+ private List<EdgeInputFormat<I, E>> edgeInputFormats;
+
+ @Override
+ public void setConf(
+ ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ super.setConf(conf);
+ edgeInputFormats =
+ EdgeInputFormatDescription.createEdgeInputFormats(getConf());
+ if (edgeInputFormats.isEmpty()) {
+ throw new IllegalStateException("setConf: Using MultiEdgeInputFormat " +
+ "without specifying edge inputs");
+ }
+ }
+
+ @Override
+ public EdgeReader<I, E> createEdgeReader(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException {
+ if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+ InputSplitWithInputFormatIndex split =
+ (InputSplitWithInputFormatIndex) inputSplit;
+ EdgeInputFormat<I, E> edgeInputFormat =
+ edgeInputFormats.get(split.getInputFormatIndex());
+ return edgeInputFormat.createEdgeReader(split.getSplit(), context);
+ } else {
+ throw new IllegalStateException("createEdgeReader: Got InputSplit which" +
+ " was not created by this class: " + inputSplit.getClass().getName());
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context,
+ int minSplitCountHint) throws IOException, InterruptedException {
+ return
+ MultiInputUtils.getSplits(context, minSplitCountHint, edgeInputFormats);
+ }
+
+ @Override
+ public void writeInputSplit(InputSplit inputSplit,
+ DataOutput dataOutput) throws IOException {
+ MultiInputUtils.writeInputSplit(inputSplit, dataOutput, edgeInputFormats);
+ }
+
+ @Override
+ public InputSplit readInputSplit(
+ DataInput dataInput) throws IOException, ClassNotFoundException {
+ return MultiInputUtils.readInputSplit(dataInput, edgeInputFormats);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java
new file mode 100644
index 0000000..a84a7fd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiInputUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Utility methods used by {@link MultiVertexInputFormat} and
+ * {@link MultiEdgeInputFormat}
+ */
+public class MultiInputUtils {
+ /** Do not instantiate */
+ private MultiInputUtils() {
+ }
+
+ /**
+ * Get the list of input splits for all formats.
+ *
+ * @param context The job context
+ * @param minSplitCountHint Minimum number of splits to create (hint)
+ * @param inputFormats List of input formats
+ * @return The list of input splits
+ */
+ public static List<InputSplit> getSplits(JobContext context,
+ int minSplitCountHint,
+ List<? extends GiraphInputFormat> inputFormats) throws IOException,
+ InterruptedException {
+ List<InputSplit> splits = Lists.newArrayList();
+ for (int index = 0; index < inputFormats.size(); index++) {
+ List<InputSplit> inputFormatSplits =
+ inputFormats.get(index).getSplits(context, minSplitCountHint);
+ for (InputSplit split : inputFormatSplits) {
+ splits.add(new InputSplitWithInputFormatIndex(split, index));
+ }
+ }
+ return splits;
+ }
+
+ /**
+ * Write input split info to DataOutput. Input split belongs to one of the
+ * formats.
+ *
+ * @param inputSplit InputSplit
+ * @param dataOutput DataOutput
+ * @param inputFormats List of input formats
+ */
+ public static void writeInputSplit(InputSplit inputSplit,
+ DataOutput dataOutput,
+ List<? extends GiraphInputFormat> inputFormats) throws IOException {
+ if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+ InputSplitWithInputFormatIndex split =
+ (InputSplitWithInputFormatIndex) inputSplit;
+ int index = split.getInputFormatIndex();
+ dataOutput.writeInt(index);
+ inputFormats.get(index).writeInputSplit(split.getSplit(), dataOutput);
+ } else {
+ throw new IllegalStateException("writeInputSplit: Got InputSplit which " +
+ "was not created by multi input: " + inputSplit.getClass().getName());
+ }
+ }
+
+ /**
+ * Read input split info from DataInput. Input split belongs to one of the
+ * formats.
+ *
+ * @param dataInput DataInput
+ * @param inputFormats List of input formats
+ * @return InputSplit
+ */
+ public static InputSplit readInputSplit(
+ DataInput dataInput,
+ List<? extends GiraphInputFormat> inputFormats) throws IOException,
+ ClassNotFoundException {
+ int index = dataInput.readInt();
+ InputSplit split = inputFormats.get(index).readInputSplit(dataInput);
+ return new InputSplitWithInputFormatIndex(split, index);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
new file mode 100644
index 0000000..631a451
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Vertex input format which wraps several vertex input formats.
+ * Provides the way to read data from multiple sources,
+ * using several different input formats.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class MultiVertexInputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable> extends VertexInputFormat<I, V, E> {
+ /** Vertex input formats */
+ private List<VertexInputFormat<I, V, E>> vertexInputFormats;
+
+ @Override
+ public void setConf(
+ ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ super.setConf(conf);
+ vertexInputFormats =
+ VertexInputFormatDescription.createVertexInputFormats(getConf());
+ if (vertexInputFormats.isEmpty()) {
+ throw new IllegalStateException("setConf: Using MultiVertexInputFormat " +
+ "without specifying vertex inputs");
+ }
+ }
+
+ @Override
+ public VertexReader<I, V, E> createVertexReader(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException {
+ if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+ InputSplitWithInputFormatIndex split =
+ (InputSplitWithInputFormatIndex) inputSplit;
+ VertexInputFormat<I, V, E> vertexInputFormat =
+ vertexInputFormats.get(split.getInputFormatIndex());
+ return vertexInputFormat.createVertexReader(split.getSplit(), context);
+ } else {
+ throw new IllegalStateException("createVertexReader: Got InputSplit " +
+ "which was not created by this class: " +
+ inputSplit.getClass().getName());
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context,
+ int minSplitCountHint) throws IOException, InterruptedException {
+ return MultiInputUtils.getSplits(context, minSplitCountHint,
+ vertexInputFormats);
+ }
+
+ @Override
+ public void writeInputSplit(InputSplit inputSplit,
+ DataOutput dataOutput) throws IOException {
+ MultiInputUtils.writeInputSplit(inputSplit, dataOutput, vertexInputFormats);
+ }
+
+ @Override
+ public InputSplit readInputSplit(
+ DataInput dataInput) throws IOException, ClassNotFoundException {
+ return MultiInputUtils.readInputSplit(dataInput, vertexInputFormats);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
new file mode 100644
index 0000000..bdd5a74
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Description of the vertex input format - holds vertex input format class and
+ * all parameters specifically set for that vertex input format.
+ *
+ * Used only with {@link MultiVertexInputFormat}
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class VertexInputFormatDescription<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends InputFormatDescription<VertexInputFormat<I, V, E>> {
+ /**
+ * VertexInputFormats description - JSON array containing a JSON array for
+ * each vertex input. Vertex input JSON arrays contain one or two elements -
+ * first one is the name of vertex input class, and second one is JSON object
+ * with all specific parameters for this vertex input. For example:
+ * [["VIF1",{"p":"v1"}],["VIF2",{"p":"v2","q":"v"}]]
+ */
+ public static final StrConfOption VERTEX_INPUT_FORMAT_DESCRIPTIONS =
+ new StrConfOption("giraph.multiVertexInput.descriptions", null);
+
+ /**
+ * Constructor with vertex input format class
+ *
+ * @param vertexInputFormatClass Vertex input format class
+ */
+ public VertexInputFormatDescription(
+ Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass) {
+ super(vertexInputFormatClass);
+ }
+
+ /**
+ * Constructor with json string describing this input format
+ *
+ * @param description Json string describing this input format
+ */
+ public VertexInputFormatDescription(String description) {
+ super(description);
+ }
+
+ /**
+ * Create a copy of configuration which additionally has all parameters for
+ * this input format set
+ *
+ * @param conf Configuration which we want to create a copy from
+ * @return Copy of configuration
+ */
+ private ImmutableClassesGiraphConfiguration<I, V, E, Writable>
+ createConfigurationCopy(
+ ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
+ new ImmutableClassesGiraphConfiguration<I, V, E, Writable>(conf);
+ confCopy.setVertexInputFormatClass(getInputFormatClass());
+ putParametersToConfiguration(confCopy);
+ return confCopy;
+ }
+
+ /**
+ * Get descriptions of vertex input formats from configuration.
+ *
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @return List of vertex input format descriptions
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable>
+ List<VertexInputFormatDescription<I, V, E>> getVertexInputFormatDescriptions(
+ Configuration conf) {
+ String vertexInputFormatDescriptions =
+ VERTEX_INPUT_FORMAT_DESCRIPTIONS.get(conf);
+ if (vertexInputFormatDescriptions == null) {
+ return Lists.newArrayList();
+ }
+ try {
+ JSONArray inputFormatsJson = new JSONArray(vertexInputFormatDescriptions);
+ List<VertexInputFormatDescription<I, V, E>> descriptions =
+ Lists.newArrayListWithCapacity(inputFormatsJson.length());
+ for (int i = 0; i < inputFormatsJson.length(); i++) {
+ descriptions.add(new VertexInputFormatDescription<I, V, E>(
+ inputFormatsJson.getString(i)));
+ }
+ return descriptions;
+ } catch (JSONException e) {
+ throw new IllegalStateException("getVertexInputFormatDescriptions: " +
+ "JSONException occurred while trying to process " +
+ vertexInputFormatDescriptions, e);
+ }
+ }
+
+ /**
+ * Create all vertex input formats
+ *
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @return List with all vertex input formats
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable>
+ List<VertexInputFormat<I, V, E>> createVertexInputFormats(
+ ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ List<VertexInputFormatDescription<I, V, E>> descriptions =
+ getVertexInputFormatDescriptions(conf);
+ List<VertexInputFormat<I, V, E>> vertexInputFormats =
+ Lists.newArrayListWithCapacity(descriptions.size());
+ for (VertexInputFormatDescription<I, V, E> description : descriptions) {
+ ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
+ description.createConfigurationCopy(conf);
+ vertexInputFormats.add(confCopy.createWrappedVertexInputFormat());
+ }
+ return vertexInputFormats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java
new file mode 100644
index 0000000..2611e9c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for Input formats which wrap several input formats and allow
+ * reading data from multiple sources.
+ */
+package org.apache.giraph.io.formats.multi;
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
index 9b14727..928b975 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
@@ -74,6 +76,7 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
@Override
public void setConf(
ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ WrappedEdgeInputFormat.this.getConf().updateConfiguration(conf);
super.setConf(conf);
edgeReader.setConf(conf);
}
@@ -113,4 +116,16 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
}
};
}
+
+ @Override
+ public void writeInputSplit(InputSplit inputSplit,
+ DataOutput dataOutput) throws IOException {
+ originalInputFormat.writeInputSplit(inputSplit, dataOutput);
+ }
+
+ @Override
+ public InputSplit readInputSplit(
+ DataInput dataInput) throws IOException, ClassNotFoundException {
+ return originalInputFormat.readInputSplit(dataInput);
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
index 4f0cfea..ed606e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
@@ -76,6 +78,7 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
@Override
public void setConf(
ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ WrappedVertexInputFormat.this.getConf().updateConfiguration(conf);
super.setConf(conf);
vertexReader.setConf(conf);
}
@@ -110,4 +113,16 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
}
};
}
+
+ @Override
+ public void writeInputSplit(InputSplit inputSplit,
+ DataOutput dataOutput) throws IOException {
+ originalInputFormat.writeInputSplit(inputSplit, dataOutput);
+ }
+
+ @Override
+ public InputSplit readInputSplit(
+ DataInput dataInput) throws IOException, ClassNotFoundException {
+ return originalInputFormat.readInputSplit(dataInput);
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 2c1a679..f00116a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -654,7 +654,8 @@ public class BspServiceMaster<I extends WritableComparable,
for (int i = 0; i < splitList.size(); ++i) {
InputSplit inputSplit = splitList.get(i);
taskExecutor.submit(new LogStacktraceCallable<Void>(
- new WriteInputSplit(inputSplit, inputSplitsPath, i, writeLocations)));
+ new WriteInputSplit(inputFormat, inputSplit, inputSplitsPath, i,
+ writeLocations)));
}
taskExecutor.shutdown();
ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
@@ -1872,6 +1873,8 @@ public class BspServiceMaster<I extends WritableComparable,
* Upon failure call() throws an exception.
*/
private class WriteInputSplit implements Callable<Void> {
+ /** Input format */
+ private final GiraphInputFormat inputFormat;
/** Input split which we are going to write */
private final InputSplit inputSplit;
/** Input splits path */
@@ -1884,6 +1887,7 @@ public class BspServiceMaster<I extends WritableComparable,
/**
* Constructor
*
+ * @param inputFormat Input format
* @param inputSplit Input split which we are going to write
* @param inputSplitsPath Input splits path
* @param index Index of the input split
@@ -1891,10 +1895,12 @@ public class BspServiceMaster<I extends WritableComparable,
* be used by workers for prioritizing local splits
* when reading)
*/
- public WriteInputSplit(InputSplit inputSplit,
+ public WriteInputSplit(GiraphInputFormat inputFormat,
+ InputSplit inputSplit,
String inputSplitsPath,
int index,
boolean writeLocations) {
+ this.inputFormat = inputFormat;
this.inputSplit = inputSplit;
this.inputSplitsPath = inputSplitsPath;
this.index = index;
@@ -1926,9 +1932,7 @@ public class BspServiceMaster<I extends WritableComparable,
locations == null ? "" : locations.toString());
}
- Text.writeString(outputStream,
- inputSplit.getClass().getName());
- ((Writable) inputSplit).write(outputStream);
+ inputFormat.writeInputSplit(inputSplit, outputStream);
inputSplitPath = inputSplitsPath + "/" + index;
getZkExt().createExt(inputSplitPath,
byteArrayOutputStream.toByteArray(),
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 01937ab..51edbac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -313,6 +313,7 @@ public class BspServiceWorker<I extends WritableComparable,
VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
new VertexInputSplitsCallableFactory<I, V, E, M>(
+ getConfiguration().createWrappedVertexInputFormat(),
getContext(),
graphState,
getConfiguration(),
@@ -350,6 +351,7 @@ public class BspServiceWorker<I extends WritableComparable,
EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
new EdgeInputSplitsCallableFactory<I, V, E, M>(
+ getConfiguration().createWrappedEdgeInputFormat(),
getContext(),
graphState,
getConfiguration(),
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index de1e774..83fe5ea 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -57,6 +57,8 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
/** Class logger */
private static final Logger LOG = Logger.getLogger(
EdgeInputSplitsCallable.class);
+ /** Edge input format */
+ private final EdgeInputFormat<I, E> edgeInputFormat;
/** Input split max edges (-1 denotes all) */
private final long inputSplitMaxEdges;
@@ -67,6 +69,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
/**
* Constructor.
*
+ * @param edgeInputFormat Edge input format
* @param context Context
* @param graphState Graph state
* @param configuration Configuration
@@ -75,6 +78,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
* @param zooKeeperExt Handle to ZooKeeperExt
*/
public EdgeInputSplitsCallable(
+ EdgeInputFormat<I, E> edgeInputFormat,
Mapper<?, ?, ?, ?>.Context context,
GraphState<I, V, E, M> graphState,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
@@ -83,6 +87,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
ZooKeeperExt zooKeeperExt) {
super(context, graphState, configuration, bspServiceWorker,
splitsHandler, zooKeeperExt);
+ this.edgeInputFormat = edgeInputFormat;
inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
@@ -90,6 +95,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
totalEdgesMeter = getTotalEdgesLoadedMeter();
}
+ @Override
+ public EdgeInputFormat<I, E> getInputFormat() {
+ return edgeInputFormat;
+ }
+
/**
* Read edges from input split. If testing, the user may request a
* maximum number of edges to be read from an input split.
@@ -105,8 +115,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
InputSplit inputSplit,
GraphState<I, V, E, M> graphState) throws IOException,
InterruptedException {
- EdgeInputFormat<I, E> edgeInputFormat =
- configuration.createWrappedEdgeInputFormat();
EdgeReader<I, E> edgeReader =
edgeInputFormat.createEdgeReader(inputSplit, context);
edgeReader.setConf(
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 4a1705b..33fb515 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
@@ -38,6 +39,8 @@ import org.apache.hadoop.mapreduce.Mapper;
public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements CallableFactory<VertexEdgeCount> {
+ /** Edge input format */
+ private final EdgeInputFormat<I, E> edgeInputFormat;
/** Mapper context. */
private final Mapper<?, ?, ?, ?>.Context context;
/** Graph state. */
@@ -54,6 +57,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
/**
* Constructor.
*
+ * @param edgeInputFormat Edge input format
* @param context Mapper context
* @param graphState Graph state
* @param configuration Configuration
@@ -62,12 +66,14 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
* @param zooKeeperExt {@link ZooKeeperExt} for this worker
*/
public EdgeInputSplitsCallableFactory(
+ EdgeInputFormat<I, E> edgeInputFormat,
Mapper<?, ?, ?, ?>.Context context,
GraphState<I, V, E, M> graphState,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
BspServiceWorker<I, V, E, M> bspServiceWorker,
InputSplitsHandler splitsHandler,
ZooKeeperExt zooKeeperExt) {
+ this.edgeInputFormat = edgeInputFormat;
this.context = context;
this.graphState = graphState;
this.configuration = configuration;
@@ -79,6 +85,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
@Override
public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
return new EdgeInputSplitsCallable<I, V, E, M>(
+ edgeInputFormat,
context,
graphState,
configuration,
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index a3a9ab7..f7a8340 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -23,6 +23,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MeterDesc;
import org.apache.giraph.time.SystemTime;
@@ -34,7 +35,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
@@ -118,6 +118,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
// CHECKSTYLE: resume ParameterNumberCheck
/**
+ * Get input format
+ *
+ * @return Input format
+ */
+ public abstract GiraphInputFormat getInputFormat();
+
+ /**
* Get Meter tracking edges loaded
*
* @return Meter tracking edges loaded
@@ -255,12 +262,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
if (useLocality) {
Text.readString(inputStream); // location data unused here, skip
}
- String inputSplitClass = Text.readString(inputStream);
- InputSplit inputSplit = (InputSplit)
- ReflectionUtils.newInstance(
- configuration.getClassByName(inputSplitClass),
- configuration);
- ((Writable) inputSplit).readFields(inputStream);
+ InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
if (LOG.isInfoEnabled()) {
LOG.info("getInputSplit: Reserved " + inputSplitPath +
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 224856e..d32ccaf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.partition.PartitionOwner;
@@ -57,6 +58,8 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
/** Class logger */
private static final Logger LOG =
Logger.getLogger(VertexInputSplitsCallable.class);
+ /** Vertex input format */
+ private final VertexInputFormat<I, V, E> vertexInputFormat;
/** Input split max vertices (-1 denotes all) */
private final long inputSplitMaxVertices;
/** Bsp service worker (only use thread-safe methods) */
@@ -71,6 +74,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
/**
* Constructor.
*
+ * @param vertexInputFormat Vertex input format
* @param context Context
* @param graphState Graph state
* @param configuration Configuration
@@ -79,6 +83,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
* @param zooKeeperExt Handle to ZooKeeperExt
*/
public VertexInputSplitsCallable(
+ VertexInputFormat<I, V, E> vertexInputFormat,
Mapper<?, ?, ?, ?>.Context context,
GraphState<I, V, E, M> graphState,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
@@ -87,6 +92,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
ZooKeeperExt zooKeeperExt) {
super(context, graphState, configuration, bspServiceWorker,
splitsHandler, zooKeeperExt);
+ this.vertexInputFormat = vertexInputFormat;
inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
this.bspServiceWorker = bspServiceWorker;
@@ -96,6 +102,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
totalEdgesMeter = getTotalEdgesLoadedMeter();
}
+ @Override
+ public GiraphInputFormat getInputFormat() {
+ return vertexInputFormat;
+ }
+
/**
* Read vertices from input split. If testing, the user may request a
* maximum number of vertices to be read from an input split.
@@ -111,8 +122,6 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
InputSplit inputSplit,
GraphState<I, V, E, M> graphState)
throws IOException, InterruptedException {
- VertexInputFormat<I, V, E> vertexInputFormat =
- configuration.createWrappedVertexInputFormat();
VertexReader<I, V, E> vertexReader =
vertexInputFormat.createVertexReader(inputSplit, context);
vertexReader.setConf(
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index 4eff3b8..cf5e8ad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
@@ -38,6 +39,8 @@ import org.apache.hadoop.mapreduce.Mapper;
public class VertexInputSplitsCallableFactory<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements CallableFactory<VertexEdgeCount> {
+ /** Vertex input format */
+ private final VertexInputFormat<I, V, E> vertexInputFormat;
/** Mapper context. */
private final Mapper<?, ?, ?, ?>.Context context;
/** Graph state. */
@@ -54,6 +57,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
/**
* Constructor.
*
+ * @param vertexInputFormat Vertex input format
* @param context Mapper context
* @param graphState Graph state
* @param configuration Configuration
@@ -62,12 +66,14 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
* @param zooKeeperExt {@link ZooKeeperExt} for this worker
*/
public VertexInputSplitsCallableFactory(
+ VertexInputFormat<I, V, E> vertexInputFormat,
Mapper<?, ?, ?, ?>.Context context,
GraphState<I, V, E, M> graphState,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
BspServiceWorker<I, V, E, M> bspServiceWorker,
InputSplitsHandler splitsHandler,
ZooKeeperExt zooKeeperExt) {
+ this.vertexInputFormat = vertexInputFormat;
this.context = context;
this.graphState = graphState;
this.configuration = configuration;
@@ -79,6 +85,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
@Override
public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
return new VertexInputSplitsCallable<I, V, E, M>(
+ vertexInputFormat,
context,
graphState,
configuration,