You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/02/26 22:37:52 UTC
git commit: GIRAPH-: GiraphInputSplit (nitay)
Updated Branches:
refs/heads/nitay/inputsplit [created] f38d2b885
GIRAPH-: GiraphInputSplit (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f38d2b88
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f38d2b88
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f38d2b88
Branch: refs/heads/nitay/inputsplit
Commit: f38d2b8853ab3f24e560be5a1c104ca501ba55f4
Parents: 4585139
Author: Nitay Joffe <ni...@apache.org>
Authored: Wed Dec 19 14:53:51 2012 -0800
Committer: Nitay Joffe <ni...@apache.org>
Committed: Wed Dec 19 20:48:21 2012 -0800
----------------------------------------------------------------------
.../LongDoubleFloatDoubleTextInputFormat.java | 4 +-
...lizingLongDoubleFloatDoubleTextInputFormat.java | 4 +-
.../giraph/examples/SimplePageRankVertex.java | 4 +-
.../giraph/examples/SimpleSuperstepVertex.java | 4 +-
.../org/apache/giraph/graph/BspServiceMaster.java | 26 +++++++-------
.../org/apache/giraph/graph/EdgeInputFormat.java | 27 ++------------
.../org/apache/giraph/graph/GiraphInputFormat.java | 24 +++++++++----
.../org/apache/giraph/graph/VertexInputFormat.java | 29 ++-------------
.../java/org/apache/giraph/graph/VertexReader.java | 10 -----
.../giraph/graph/VertexValueInputFormat.java | 3 +-
.../io/AdjacencyListTextVertexInputFormat.java | 3 +-
.../apache/giraph/io/GiraphFileInputFormat.java | 27 ++++++++------
.../giraph/io/IntIntNullIntTextInputFormat.java | 3 +-
.../giraph/io/IntNullNullNullTextInputFormat.java | 4 +-
.../giraph/io/IntNullTextEdgeInputFormat.java | 4 +-
.../giraph/io/JsonBase64VertexInputFormat.java | 3 +-
...JsonLongDoubleFloatDoubleVertexInputFormat.java | 4 +-
...DoubleDoubleAdjacencyListVertexInputFormat.java | 4 +-
.../giraph/io/PseudoRandomVertexInputFormat.java | 3 +-
.../giraph/io/SequenceFileVertexInputFormat.java | 3 +-
...DoubleDoubleAdjacencyListVertexInputFormat.java | 4 +-
.../org/apache/giraph/io/TextEdgeInputFormat.java | 5 ++-
.../apache/giraph/io/TextVertexInputFormat.java | 4 ++-
.../io/accumulo/AccumuloVertexInputFormat.java | 5 ++-
.../giraph/io/hcatalog/GiraphHCatInputFormat.java | 17 +++++----
.../apache/giraph/io/hcatalog/GiraphHCatSplit.java | 18 +++++++++
.../io/hcatalog/HCatalogEdgeInputFormat.java | 8 +++--
.../io/hcatalog/HCatalogVertexInputFormat.java | 3 +-
.../edgemarker/AccumuloEdgeInputFormat.java | 5 ++-
.../io/hbase/edgemarker/TableEdgeInputFormat.java | 6 +++-
.../org/apache/giraph/input/GiraphInputSplit.java | 29 +++++++++++++++
.../java/org/apache/giraph/io/GiraphFileSplit.java | 11 ++++++
pom.xml | 3 +-
33 files changed, 175 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
index b06f07f..a4ca5bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
@@ -21,11 +21,11 @@ package org.apache.giraph.examples;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.giraph.io.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.collect.Lists;
@@ -45,7 +45,7 @@ public class LongDoubleFloatDoubleTextInputFormat
FloatWritable, DoubleWritable> {
@Override
- public TextVertexReader createVertexReader(InputSplit split,
+ public TextVertexReader createVertexReader(GiraphInputSplit split,
TaskAttemptContext context)
throws IOException {
return new LongDoubleFloatDoubleVertexReader();
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
index 300fbb6..a2b8808 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
@@ -21,11 +21,11 @@ package org.apache.giraph.examples;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.giraph.io.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.collect.Lists;
@@ -47,7 +47,7 @@ public class NormalizingLongDoubleFloatDoubleTextInputFormat
@Override
public TextVertexReader createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
+ GiraphInputSplit split, TaskAttemptContext context) throws IOException {
return new NormalizingLongDoubleFloatDoubleVertexReader();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
index 5fd834b..fa0f88e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
@@ -27,13 +27,13 @@ import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.giraph.io.GeneratedVertexInputFormat;
import org.apache.giraph.io.TextVertexOutputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
@@ -217,7 +217,7 @@ public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
DoubleWritable, FloatWritable, DoubleWritable> {
@Override
public VertexReader<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> createVertexReader(InputSplit split,
+ FloatWritable, DoubleWritable> createVertexReader(GiraphInputSplit split,
TaskAttemptContext context)
throws IOException {
return new SimplePageRankVertexReader();
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
index b166ce0..d65a211 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
@@ -22,13 +22,13 @@ import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.giraph.io.GeneratedVertexInputFormat;
import org.apache.giraph.io.TextVertexOutputFormat;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
@@ -120,7 +120,7 @@ public class SimpleSuperstepVertex extends
IntWritable, FloatWritable, IntWritable> {
@Override
public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable>
- createVertexReader(InputSplit split, TaskAttemptContext context)
+ createVertexReader(GiraphInputSplit split, TaskAttemptContext context)
throws IOException {
return new SimpleSuperstepVertexReader();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
index ee64a46..91f1abc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
@@ -35,6 +35,7 @@ import org.apache.giraph.graph.partition.MasterGraphPartitioner;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.metrics.GiraphMetrics;
@@ -58,7 +59,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
@@ -257,11 +257,12 @@ public class BspServiceMaster<I extends WritableComparable,
* @param inputSplitType Type of input splits (for logging purposes)
* @return List of input splits for the given format
*/
- private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
- int numWorkers,
- String inputSplitType) {
+ private List<GiraphInputSplit> generateInputSplits(
+ GiraphInputFormat inputFormat,
+ int numWorkers,
+ String inputSplitType) {
String logPrefix = "generate" + inputSplitType + "InputSplits";
- List<InputSplit> splits;
+ List<GiraphInputSplit> splits;
try {
splits = inputFormat.getSplits(getContext(), numWorkers);
} catch (IOException e) {
@@ -277,7 +278,7 @@ public class BspServiceMaster<I extends WritableComparable,
if (samplePercent !=
GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
int lastIndex = (int) (samplePercent * splits.size() / 100f);
- List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
+ List<GiraphInputSplit> sampleSplits = splits.subList(0, lastIndex);
LOG.warn(logPrefix + ": Using sampling - Processing only " +
sampleSplits.size() + " instead of " + splits.size() +
" expected splits.");
@@ -565,7 +566,7 @@ public class BspServiceMaster<I extends WritableComparable,
// Note that the input splits may only be a sample if
// INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
- List<InputSplit> splitList = generateInputSplits(inputFormat,
+ List<GiraphInputSplit> splitList = generateInputSplits(inputFormat,
healthyWorkerInfoList.size(), inputSplitType);
if (splitList.isEmpty()) {
@@ -594,7 +595,7 @@ public class BspServiceMaster<I extends WritableComparable,
ExecutorService taskExecutor =
Executors.newFixedThreadPool(inputSplitThreadCount);
for (int i = 0; i < splitList.size(); ++i) {
- InputSplit inputSplit = splitList.get(i);
+ GiraphInputSplit inputSplit = splitList.get(i);
taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i));
}
taskExecutor.shutdown();
@@ -1792,7 +1793,7 @@ public class BspServiceMaster<I extends WritableComparable,
*/
private class WriteInputSplit implements Callable<Void> {
/** Input split which we are going to write */
- private final InputSplit inputSplit;
+ private final GiraphInputSplit inputSplit;
/** Input splits path */
private final String inputSplitsPath;
/** Index of the input split */
@@ -1805,7 +1806,7 @@ public class BspServiceMaster<I extends WritableComparable,
* @param inputSplitsPath Input splits path
* @param index Index of the input split
*/
- public WriteInputSplit(InputSplit inputSplit,
+ public WriteInputSplit(GiraphInputSplit inputSplit,
String inputSplitsPath,
int index) {
this.inputSplit = inputSplit;
@@ -1835,9 +1836,8 @@ public class BspServiceMaster<I extends WritableComparable,
}
Text.writeString(outputStream,
locations == null ? "" : locations.toString());
- Text.writeString(outputStream,
- inputSplit.getClass().getName());
- ((Writable) inputSplit).write(outputStream);
+ Text.writeString(outputStream, inputSplit.getClass().getName());
+ inputSplit.write(outputStream);
inputSplitPath = inputSplitsPath + "/" + index;
getZkExt().createExt(inputSplitPath,
byteArrayOutputStream.toByteArray(),
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
index cdc1891..f32a3e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,33 +37,11 @@ import java.util.List;
public abstract class EdgeInputFormat<I extends WritableComparable,
E extends Writable> implements GiraphInputFormat {
/**
- * Logically split the vertices for a graph processing application.
- *
- * Each {@link InputSplit} is then assigned to a worker for processing.
- *
- * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
- * input files are not physically split into chunks. For e.g. a split could
- * be <i><input-file-path, start, offset></i> tuple. The InputFormat
- * also creates the {@link VertexReader} to read the {@link InputSplit}.
- *
- * Also, the number of workers is a hint given to the developer to try to
- * intelligently determine how many splits to create (if this is
- * adjustable) at runtime.
- *
- * @param context Context of the job
- * @param numWorkers Number of workers used for this job
- * @return an array of {@link InputSplit}s for the job.
- */
- @Override
- public abstract List<InputSplit> getSplits(
- JobContext context, int numWorkers) throws IOException,
- InterruptedException;
-
- /**
* Create an edge reader for a given split. The framework will call
* {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
*
+ *
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
@@ -70,6 +49,6 @@ public abstract class EdgeInputFormat<I extends WritableComparable,
* @throws InterruptedException
*/
public abstract EdgeReader<I, E> createEdgeReader(
- InputSplit split,
+ GiraphInputSplit split,
TaskAttemptContext context) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
index e1cc844..0a7223a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@@ -29,14 +30,23 @@ import java.util.List;
*/
public interface GiraphInputFormat {
/**
- * Get the list of input splits for the format.
+ * Logically split the vertices for a graph processing application.
*
- * @param context The job context
- * @param numWorkers Number of workers
- * @return The list of input splits
- * @throws IOException
- * @throws InterruptedException
+ * Each {@link InputSplit} is then assigned to a worker for processing.
+ *
+ * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+ * input files are not physically split into chunks. For e.g. a split could
+ * be <i><input-file-path, start, offset></i> tuple. The InputFormat
+ * also creates the {@link VertexReader} to read the {@link InputSplit}.
+ *
+ * Also, the number of workers is a hint given to the developer to try to
+ * intelligently determine how many splits to create (if this is
+ * adjustable) at runtime.
+ *
+ * @param context Context of the job
+ * @param numWorkers Number of workers used for this job
+ * @return an array of {@link InputSplit}s for the job.
*/
- List<InputSplit> getSplits(JobContext context, int numWorkers)
+ List<GiraphInputSplit> getSplits(JobContext context, int numWorkers)
throws IOException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
index 9824b69..78bc292 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
@@ -18,14 +18,13 @@
package org.apache.giraph.graph;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
-import java.util.List;
/**
* Use this to load data for a BSP application. Note that the InputSplit must
@@ -43,33 +42,11 @@ public abstract class VertexInputFormat<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements GiraphInputFormat {
/**
- * Logically split the vertices for a graph processing application.
- *
- * Each {@link InputSplit} is then assigned to a worker for processing.
- *
- * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
- * input files are not physically split into chunks. For e.g. a split could
- * be <i><input-file-path, start, offset></i> tuple. The InputFormat
- * also creates the {@link VertexReader} to read the {@link InputSplit}.
- *
- * Also, the number of workers is a hint given to the developer to try to
- * intelligently determine how many splits to create (if this is
- * adjustable) at runtime.
- *
- * @param context Context of the job
- * @param numWorkers Number of workers used for this job
- * @return an array of {@link InputSplit}s for the job.
- */
- @Override
- public abstract List<InputSplit> getSplits(
- JobContext context, int numWorkers)
- throws IOException, InterruptedException;
-
- /**
* Create a vertex reader for a given split. The framework will call
* {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
*
+ *
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
@@ -77,6 +54,6 @@ public abstract class VertexInputFormat<I extends WritableComparable,
* @throws InterruptedException
*/
public abstract VertexReader<I, V, E, M> createVertexReader(
- InputSplit split,
+ GiraphInputSplit split,
TaskAttemptContext context) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
index 77801cc..f8b59b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
@@ -74,14 +74,4 @@ public interface VertexReader<I extends WritableComparable,
* @throws IOException
*/
void close() throws IOException;
-
- /**
- * How much of the input has the {@link VertexReader} consumed i.e.
- * has been processed by?
- *
- * @return Progress from <code>0.0</code> to <code>1.0</code>.
- * @throws IOException
- * @throws InterruptedException
- */
- float getProgress() throws IOException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
index 804d23e..deab07e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,7 +53,7 @@ public abstract class VertexValueInputFormat<I extends WritableComparable,
@Override
public final VertexReader<I, V, E, M> createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
+ GiraphInputSplit split, TaskAttemptContext context) throws IOException {
return createVertexValueReader(split, context);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
index 3b047f3..802d30f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -67,7 +68,7 @@ public abstract class AdjacencyListTextVertexInputFormat<I extends
@Override
public abstract AdjacencyListTextVertexReader createVertexReader(
- InputSplit split, TaskAttemptContext context);
+ GiraphInputSplit split, TaskAttemptContext context);
/**
* Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
index 114e75f..4b2a9a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
@@ -18,21 +18,22 @@
package org.apache.giraph.io;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -265,13 +266,13 @@ end[HADOOP_NON_SECURE]*/
* @return The list of vertex/edge input splits
* @throws IOException
*/
- private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
+ private List<GiraphInputSplit> getSplits(JobContext job, List<FileStatus> files)
throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
- List<InputSplit> splits = new ArrayList<InputSplit>();
+ List<GiraphInputSplit> splits = Lists.newArrayList();
for (FileStatus file: files) {
Path path = file.getPath();
@@ -285,21 +286,22 @@ end[HADOOP_NON_SECURE]*/
long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+ splits.add(new GiraphFileSplit(path, length - bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
- splits.add(new FileSplit(path, length - bytesRemaining,
+ splits.add(new GiraphFileSplit(path, length - bytesRemaining,
bytesRemaining,
blkLocations[blkLocations.length - 1].getHosts()));
}
} else if (length != 0) {
- splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+ splits.add(new GiraphFileSplit(path, 0, length,
+ blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
- splits.add(new FileSplit(path, 0, length, new String[0]));
+ splits.add(new GiraphFileSplit(path, 0, length, new String[0]));
}
}
return splits;
@@ -312,9 +314,10 @@ end[HADOOP_NON_SECURE]*/
* @return The list of vertex input splits
* @throws IOException
*/
- public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
+ public List<GiraphInputSplit> getVertexSplits(JobContext job)
+ throws IOException {
List<FileStatus> files = listVertexStatus(job);
- List<InputSplit> splits = getSplits(job, files);
+ List<GiraphInputSplit> splits = getSplits(job, files);
// Save the number of input files in the job-conf
job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
LOG.debug("Total # of vertex splits: " + splits.size());
@@ -328,9 +331,9 @@ end[HADOOP_NON_SECURE]*/
* @return The list of edge input splits
* @throws IOException
*/
- public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
+ public List<GiraphInputSplit> getEdgeSplits(JobContext job) throws IOException {
List<FileStatus> files = listEdgeStatus(job);
- List<InputSplit> splits = getSplits(job, files);
+ List<GiraphInputSplit> splits = getSplits(job, files);
// Save the number of input files in the job-conf
job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
LOG.debug("Total # of edge splits: " + splits.size());
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
index 9aa21c7..90a5b50 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -44,7 +45,7 @@ public class IntIntNullIntTextInputFormat extends
private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
@Override
- public TextVertexReader createVertexReader(InputSplit split,
+ public TextVertexReader createVertexReader(GiraphInputSplit split,
TaskAttemptContext context)
throws IOException {
return new IntIntNullIntVertexReader();
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
index 4d98657..e03dc6c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
@@ -17,11 +17,11 @@
*/
package org.apache.giraph.io;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.giraph.graph.Edge;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.collect.ImmutableList;
@@ -38,7 +38,7 @@ public class IntNullNullNullTextInputFormat extends TextVertexInputFormat<
IntWritable, NullWritable, NullWritable, NullWritable> {
@Override
public TextVertexReader createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
+ GiraphInputSplit split, TaskAttemptContext context) throws IOException {
return new IntNullNullNullVertexReader();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
index ed13b45..6805e45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
@@ -18,11 +18,11 @@
package org.apache.giraph.io;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.giraph.utils.IntPair;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
@@ -41,7 +41,7 @@ public class IntNullTextEdgeInputFormat extends
@Override
public TextEdgeReader createEdgeReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
+ GiraphInputSplit split, TaskAttemptContext context) throws IOException {
return new IntNullTextEdgeReader();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
index cc5872c..74604f9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.giraph.io;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.giraph.graph.Edge;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -54,7 +55,7 @@ public class JsonBase64VertexInputFormat<I extends WritableComparable,
extends TextVertexInputFormat<I, V, E, M> {
@Override
- public TextVertexReader createVertexReader(InputSplit split,
+ public TextVertexReader createVertexReader(GiraphInputSplit split,
TaskAttemptContext context) {
return new JsonBase64VertexReader();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
index c01d442..3f98807 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
@@ -19,11 +19,11 @@ package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.json.JSONArray;
import org.json.JSONException;
@@ -44,7 +44,7 @@ public class JsonLongDoubleFloatDoubleVertexInputFormat extends
FloatWritable, DoubleWritable> {
@Override
- public TextVertexReader createVertexReader(InputSplit split,
+ public TextVertexReader createVertexReader(GiraphInputSplit split,
TaskAttemptContext context) {
return new JsonLongDoubleFloatDoubleVertexReader();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
index acae10b..bd79b61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -18,10 +18,10 @@
package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
@@ -39,7 +39,7 @@ public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
DoubleWritable, M> {
@Override
- public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+ public AdjacencyListTextVertexReader createVertexReader(GiraphInputSplit split,
TaskAttemptContext context) {
return new LongDoubleDoubleAdjacencyListVertexReader(null);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
index bac0a39..a2715e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -72,7 +73,7 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
@Override
public VertexReader<LongWritable, DoubleWritable, DoubleWritable, M>
- createVertexReader(InputSplit split, TaskAttemptContext context)
+ createVertexReader(GiraphInputSplit split, TaskAttemptContext context)
throws IOException {
return new PseudoRandomVertexReader<M>();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
index a984089..c8e575f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -56,7 +57,7 @@ public class SequenceFileVertexInputFormat<I extends WritableComparable,
}
@Override
- public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+ public VertexReader<I, V, E, M> createVertexReader(GiraphInputSplit split,
TaskAttemptContext context) throws IOException {
return new SequenceFileVertexReader<I, V, E, M, X>(
sequenceFileInputFormat.createRecordReader(split, context));
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
index a009000..8307dc8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -18,10 +18,10 @@
package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
@@ -36,7 +36,7 @@ public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
DoubleWritable, M> {
@Override
- public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+ public AdjacencyListTextVertexReader createVertexReader(GiraphInputSplit split,
TaskAttemptContext context) {
return new TextDoubleDoubleAdjacencyListVertexReader(null);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
index e4cbf94..6dd059c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeInputFormat;
import org.apache.giraph.graph.EdgeReader;
import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -50,7 +51,7 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
@Override
- public List<InputSplit> getSplits(
+ public List<GiraphInputSplit> getSplits(
JobContext context, int numWorkers) throws IOException,
InterruptedException {
// Ignore the hint of numWorkers here since we are using
@@ -60,7 +61,7 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
@Override
public abstract TextEdgeReader createEdgeReader(
- InputSplit split, TaskAttemptContext context) throws IOException;
+ GiraphInputSplit split, TaskAttemptContext context) throws IOException;
/**
* {@link EdgeReader} for {@link TextEdgeInputFormat}.
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
index e085473..0c1aaca 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -65,6 +66,7 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
* The factory method which produces the {@link TextVertexReader} used by this
* input format.
*
+ *
* @param split
* the split to be read
* @param context
@@ -73,7 +75,7 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
* the text vertex reader to be used
*/
@Override
- public abstract TextVertexReader createVertexReader(InputSplit split,
+ public abstract TextVertexReader createVertexReader(GiraphInputSplit split,
TaskAttemptContext context) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
index 92328b7..90cfd51 100644
--- a/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -160,10 +161,10 @@ public abstract class AccumuloVertexInputFormat<
* @throws InterruptedException
*/
@Override
- public List<InputSplit> getSplits(
+ public List<GiraphInputSplit> getSplits(
JobContext context, int numWorkers)
throws IOException, InterruptedException {
- List<InputSplit> splits = null;
+ List<GiraphInputSplit> splits = null;
try {
splits = accumuloInputFormat.getSplits(context);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
index 2e91cba..ea1733a 100644
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.giraph.io.hcatalog;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,6 +42,8 @@ import org.apache.hcatalog.mapreduce.HCatUtils;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.PartInfo;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -211,12 +214,12 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
* @throws IOException
* @throws InterruptedException
*/
- private List<InputSplit> getSplits(JobContext jobContext,
+ private List<GiraphInputSplit> getSplits(JobContext jobContext,
InputJobInfo inputJobInfo)
throws IOException, InterruptedException {
Configuration conf = jobContext.getConfiguration();
- List<InputSplit> splits = new ArrayList<InputSplit>();
+ List<GiraphInputSplit> splits = Lists.newArrayList();
List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
if (partitionInfoList == null) {
//No partitions match the specified partition filter
@@ -264,7 +267,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
inputFormat.getSplits(jobConf, desiredNumSplits);
for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
- splits.add(new HCatSplit(partitionInfo, split, allCols));
+ splits.add(new GiraphHCatSplit(partitionInfo, split, allCols));
}
}
@@ -313,7 +316,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
* @throws IOException
* @throws InterruptedException
*/
- public List<InputSplit> getVertexSplits(JobContext jobContext)
+ public List<GiraphInputSplit> getVertexSplits(JobContext jobContext)
throws IOException, InterruptedException {
return getSplits(jobContext,
getVertexJobInfo(jobContext.getConfiguration()));
@@ -327,7 +330,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
* @throws IOException
* @throws InterruptedException
*/
- public List<InputSplit> getEdgeSplits(JobContext jobContext)
+ public List<GiraphInputSplit> getEdgeSplits(JobContext jobContext)
throws IOException, InterruptedException {
return getSplits(jobContext,
getEdgeJobInfo(jobContext.getConfiguration()));
@@ -344,7 +347,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
* @throws InterruptedException
*/
private RecordReader<WritableComparable, HCatRecord>
- createRecordReader(InputSplit split,
+ createRecordReader(GiraphInputSplit split,
HCatSchema schema,
TaskAttemptContext taskContext)
throws IOException, InterruptedException {
@@ -392,7 +395,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
* @throws InterruptedException
*/
public RecordReader<WritableComparable, HCatRecord>
- createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
+ createEdgeRecordReader(GiraphInputSplit split, TaskAttemptContext taskContext)
throws IOException, InterruptedException {
return createRecordReader(split, getEdgeTableSchema(
taskContext.getConfiguration()), taskContext);
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java
new file mode 100644
index 0000000..1f2515e
--- /dev/null
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java
@@ -0,0 +1,18 @@
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.giraph.input.GiraphInputSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+import org.apache.hcatalog.mapreduce.PartInfo;
+
+public class GiraphHCatSplit extends HCatSplit implements GiraphInputSplit {
+ public GiraphHCatSplit() {
+ super();
+ }
+
+ public GiraphHCatSplit(PartInfo partitionInfo, InputSplit baseMapRedSplit,
+ HCatSchema tableSchema) {
+ super(partitionInfo, baseMapRedSplit, tableSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
index 2112df3..4f52405 100644
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeInputFormat;
import org.apache.giraph.graph.EdgeReader;
import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -49,7 +50,8 @@ public abstract class HCatalogEdgeInputFormat<
private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
@Override
- public final List<InputSplit> getSplits(JobContext context, int numWorkers)
+ public final List<GiraphInputSplit> getSplits(
+ JobContext context, int numWorkers)
throws IOException, InterruptedException {
return hCatInputFormat.getEdgeSplits(context);
}
@@ -64,7 +66,7 @@ public abstract class HCatalogEdgeInputFormat<
private TaskAttemptContext context;
@Override
- public final void initialize(InputSplit inputSplit,
+ public final void initialize(GiraphInputSplit inputSplit,
TaskAttemptContext context)
throws IOException, InterruptedException {
hCatRecordReader =
@@ -117,7 +119,7 @@ public abstract class HCatalogEdgeInputFormat<
@Override
public EdgeReader<I, E>
- createEdgeReader(InputSplit split, TaskAttemptContext context)
+ createEdgeReader(GiraphInputSplit split, TaskAttemptContext context)
throws IOException {
try {
HCatalogEdgeReader reader = createEdgeReader();
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
index ec49137..2c2d78c 100644
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
import org.apache.giraph.utils.TimedLogger;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -169,7 +170,7 @@ public abstract class HCatalogVertexInputFormat<
@Override
public final VertexReader<I, V, E, M>
- createVertexReader(final InputSplit split,
+ createVertexReader(final GiraphInputSplit split,
final TaskAttemptContext context)
throws IOException {
try {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java b/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
index b670144..7e05697 100644
--- a/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
+++ b/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
@@ -20,12 +20,13 @@ package org.apache.giraph.io.accumulo.edgemarker;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
+import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -44,7 +45,7 @@ public class AccumuloEdgeInputFormat
private static final Text uselessEdgeValue = new Text();
private Configuration conf;
public VertexReader<Text, Text, Text, Text>
- createVertexReader(InputSplit split, TaskAttemptContext context)
+ createVertexReader(GiraphInputSplit split, TaskAttemptContext context)
throws IOException {
try {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java b/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
index e4e08d6..f766ac0 100644
--- a/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
+++ b/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
@@ -18,6 +18,10 @@
package org.apache.giraph.io.hbase.edgemarker;
import org.apache.giraph.graph.Edge;
+import com.google.common.collect.Maps;
+
+import org.apache.giraph.input.GiraphInputSplit;
+import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
@@ -45,7 +49,7 @@ public class TableEdgeInputFormat extends
private static final Text uselessEdgeValue = new Text();
public VertexReader<Text, Text, Text, Text>
- createVertexReader(InputSplit split,
+ createVertexReader(GiraphInputSplit split,
TaskAttemptContext context) throws IOException {
return new TableEdgeVertexReader(split, context);
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java b/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java
new file mode 100644
index 0000000..f6c709b
--- /dev/null
+++ b/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java
@@ -0,0 +1,29 @@
+package org.apache.giraph.input;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+
+/**
+ * @see org.apache.hadoop.mapreduce.InputSplit
+ */
+public interface GiraphInputSplit extends Writable {
+ /**
+ * Get the list of nodes by name where the data for the split would be local.
+ * The locations do not need to be serialized.
+ * @return a new array of the node nodes.
+ * @throws java.io.IOException
+ * @throws InterruptedException
+ */
+ String[] getLocations() throws IOException, InterruptedException;
+
+ /**
+ * Get the Hadoop compatible InputSplit. Ideally we shouldn't need this, but
+ * unfortunately we deal with a lot of input formats that strictly require
+ * this class, and unfortunately since it is abstract class (not an interface)
+ *
+ * @return InputSplit, for most implementations will be just "this"
+ */
+ InputSplit getInputSplit();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java b/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java
new file mode 100644
index 0000000..2e00ebb
--- /dev/null
+++ b/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java
@@ -0,0 +1,11 @@
+package org.apache.giraph.io;
+
+import org.apache.giraph.input.GiraphInputSplit;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class GiraphFileSplit extends FileSplit implements GiraphInputSplit {
+ public GiraphFileSplit(Path file, long start, long length, String[] hosts) {
+ super(file, start, length, hosts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0be32c5..41eab1a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -810,7 +810,8 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
- <artifactId>mahout-collections</artifactId>
+ <artifactId>maho
+ ut-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>