You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:03 UTC
[07/12] GIRAPH-667: Decouple Vertex data and Computation,
make Computation and Combiner classes switchable (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 2c5f2f7..762802b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -41,8 +41,8 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
* @return Casted configuration
*/
@Override
- public ImmutableClassesGiraphConfiguration<I, ?, E, ?> getConf() {
- return (ImmutableClassesGiraphConfiguration<I, ?, E, ?>) super.getConf();
+ public ImmutableClassesGiraphConfiguration<I, ?, E> getConf() {
+ return (ImmutableClassesGiraphConfiguration<I, ?, E>) super.getConf();
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 0280c58..6b4642c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -17,7 +17,6 @@
*/
package org.apache.giraph.utils;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -34,10 +33,22 @@ import java.io.IOException;
@SuppressWarnings("unchecked")
public class ByteArrayVertexIdMessages<I extends WritableComparable,
M extends Writable> extends ByteArrayVertexIdData<I, M> {
+ /** Message value class */
+ private Class<M> messageValueClass;
/** Add the message size to the stream? (Depends on the message store) */
private boolean useMessageSizeEncoding = false;
/**
+ * Constructor
+ *
+ * @param messageValueClass Class for messages
+ */
+ public ByteArrayVertexIdMessages(
+ Class<? extends Writable> messageValueClass) {
+ this.messageValueClass = (Class<M>) messageValueClass;
+ }
+
+ /**
* Set whether message sizes should be encoded. This should only be a
* possibility when not combining. When combining, all messages need to be
* deserializd right away, so this won't help.
@@ -50,20 +61,9 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
}
}
- /**
- * Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used
- * to generate message objects.
- *
- * @return Casted configuration
- */
- @Override
- public ImmutableClassesGiraphConfiguration<I, ?, ?, M> getConf() {
- return (ImmutableClassesGiraphConfiguration<I, ?, ?, M>) super.getConf();
- }
-
@Override
public M createData() {
- return getConf().createMessageValue();
+ return ReflectionUtils.newInstance(messageValueClass);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 6016ba4..d8b121b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.giraph.Algorithm;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.edge.OutEdges;
@@ -142,14 +143,14 @@ public final class ConfigurationUtils {
performSanityCheck(cmd);
// Args are OK; attempt to populate the GiraphConfiguration with them.
- final String vertexClassName = args[0];
+ final String computationClassName = args[0];
final int workers = Integer.parseInt(cmd.getOptionValue('w'));
- populateGiraphConfiguration(giraphConf, cmd, vertexClassName, workers);
+ populateGiraphConfiguration(giraphConf, cmd, computationClassName, workers);
// validate generic parameters chosen are correct or
// throw IllegalArgumentException, halting execution.
@SuppressWarnings("rawtypes")
- GiraphConfigurationValidator<?, ?, ?, ?> gtv =
+ GiraphConfigurationValidator<?, ?, ?, ?, ?> gtv =
new GiraphConfigurationValidator(giraphConf);
gtv.validateConfiguration();
@@ -200,15 +201,17 @@ public final class ConfigurationUtils {
* should be captured here.
* @param giraphConfiguration config for this job run
* @param cmd parsed command line options to store in giraphConfiguration
- * @param vertexClassName the vertex class (application) to run in this job.
+ * @param computationClassName the computation class (application) to run in
+ * this job.
* @param workers the number of worker tasks for this job run.
*/
private static void populateGiraphConfiguration(final GiraphConfiguration
- giraphConfiguration, final CommandLine cmd, final String vertexClassName,
+ giraphConfiguration, final CommandLine cmd,
+ final String computationClassName,
final int workers) throws ClassNotFoundException, IOException {
giraphConfiguration.setWorkerConfiguration(workers, workers, 100.0f);
- giraphConfiguration.setVertexClass(
- (Class<? extends Vertex>) Class.forName(vertexClassName));
+ giraphConfiguration.setComputationClass(
+ (Class<? extends Computation>) Class.forName(computationClassName));
if (cmd.hasOption("c")) {
giraphConfiguration.setCombinerClass(
(Class<? extends Combiner>) Class.forName(cmd.getOptionValue("c")));
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
index 65d99db..6ca488c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
@@ -76,9 +76,9 @@ public class InMemoryVertexInputFormat<I extends WritableComparable,
*/
private class InMemoryVertexReader extends VertexReader<I, V, E> {
/** The iterator */
- private Iterator<Vertex<I, V, E, ?>> vertexIterator;
+ private Iterator<Vertex<I, V, E>> vertexIterator;
/** Current vertex */
- private Vertex<I, V, E, ?> currentVertex;
+ private Vertex<I, V, E> currentVertex;
@Override
public void initialize(InputSplit inputSplit,
@@ -96,7 +96,7 @@ public class InMemoryVertexInputFormat<I extends WritableComparable,
}
@Override
- public Vertex<I, V, E, ?> getCurrentVertex() {
+ public Vertex<I, V, E> getCurrentVertex() {
return currentVertex;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index be2d2a9..b4920e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -101,7 +101,7 @@ public class InternalVertexRunner {
File tmpDir = null;
try {
// Prepare input file, output folder and temporary folders
- tmpDir = FileUtils.createTestDir(conf.getVertexClass());
+ tmpDir = FileUtils.createTestDir(conf.getComputationClass());
File vertexInputFile = null;
File edgeInputFile = null;
@@ -137,7 +137,7 @@ public class InternalVertexRunner {
GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
// Create and configure the job to run the vertex
- GiraphJob job = new GiraphJob(conf, conf.getVertexClass().getName());
+ GiraphJob job = new GiraphJob(conf, conf.getComputationClass().getName());
Job internalJob = job.getInternalJob();
if (conf.hasVertexInputFormat()) {
@@ -199,7 +199,6 @@ public class InternalVertexRunner {
* @param <I> Vertex ID
* @param <V> Vertex Value
* @param <E> Edge Value
- * @param <M> Message Value
* @param conf GiraphClasses specifying which types to use
* @param graph input graph
* @return iterable output data
@@ -207,14 +206,13 @@ public class InternalVertexRunner {
*/
public static <I extends WritableComparable,
V extends Writable,
- E extends Writable,
- M extends Writable> TestGraph<I, V, E, M> run(
+ E extends Writable> TestGraph<I, V, E> run(
GiraphConfiguration conf,
- TestGraph<I, V, E, M> graph) throws Exception {
+ TestGraph<I, V, E> graph) throws Exception {
File tmpDir = null;
try {
// Prepare temporary folders
- tmpDir = FileUtils.createTestDir(conf.getVertexClass());
+ tmpDir = FileUtils.createTestDir(conf.getComputationClass());
File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
@@ -223,7 +221,7 @@ public class InternalVertexRunner {
conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
// Create and configure the job to run the vertex
- GiraphJob job = new GiraphJob(conf, conf.getVertexClass().getName());
+ GiraphJob job = new GiraphJob(conf, conf.getComputationClass().getName());
InMemoryVertexInputFormat.setGraph(graph);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index d70eecb..96352bb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -168,6 +168,26 @@ public class ReflectionUtils {
}
/**
+ * Instantiate a class, wrap exceptions
+ *
+ * @param theClass Class to instantiate
+ * @param <T> Type to instantiate
+ * @return Newly instantiated object
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T newInstance(Class<T> theClass) {
+ try {
+ return theClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException(
+ "newInstance: Couldn't instantiate " + theClass.getName(), e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException(
+ "newInstance: Illegal access " + theClass.getName(), e);
+ }
+ }
+
+ /**
* Instantiate classes that are ImmutableClassesGiraphConfigurable
*
* @param theClass Class to instantiate
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
index 3577a9e..6e46a76 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
@@ -41,17 +41,15 @@ import java.util.Map.Entry;
* @param <I> Vertex index type
* @param <V> Vertex type
* @param <E> Edge type
- * @param <M> Message type
*/
public class TestGraph<I extends WritableComparable,
V extends Writable,
- E extends Writable,
- M extends Writable>
- implements Iterable<Vertex<I, V, E, M>> {
+ E extends Writable>
+ implements Iterable<Vertex<I, V, E>> {
/** The vertex values */
- private final HashMap<I, Vertex<I, V, E, M>> vertices = Maps.newHashMap();
+ private final HashMap<I, Vertex<I, V, E>> vertices = Maps.newHashMap();
/** The configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E> conf;
/**
* Constructor requiring classes
@@ -62,7 +60,7 @@ public class TestGraph<I extends WritableComparable,
this.conf = new ImmutableClassesGiraphConfiguration(conf);
}
- public HashMap<I, Vertex<I, V, E, M>> getVertices() {
+ public HashMap<I, Vertex<I, V, E>> getVertices() {
return vertices;
}
@@ -82,9 +80,9 @@ public class TestGraph<I extends WritableComparable,
* @param edges all edges
* @return this
*/
- public TestGraph<I, V, E, M> addVertex(I id, V value,
+ public TestGraph<I, V, E> addVertex(I id, V value,
Entry<I, E>... edges) {
- Vertex<I, V, E, M> v = makeVertex(id, value, edges);
+ Vertex<I, V, E> v = makeVertex(id, value, edges);
vertices.put(id, v);
return this;
}
@@ -96,9 +94,9 @@ public class TestGraph<I extends WritableComparable,
* @param edgePair The edge
* @return this
*/
- public TestGraph<I, V, E, M> addEdge(I vertexId, Entry<I, E> edgePair) {
+ public TestGraph<I, V, E> addEdge(I vertexId, Entry<I, E> edgePair) {
if (!vertices.containsKey(vertexId)) {
- Vertex<I, V, E, M> v = conf.createVertex();
+ Vertex<I, V, E> v = conf.createVertex();
v.initialize(vertexId, conf.createVertexValue());
vertices.put(vertexId, v);
}
@@ -116,9 +114,9 @@ public class TestGraph<I extends WritableComparable,
* @param edgeValue Edge value
* @return this
*/
- public TestGraph<I, V, E, M> addEdge(I vertexId, I toVertex, E edgeValue) {
+ public TestGraph<I, V, E> addEdge(I vertexId, I toVertex, E edgeValue) {
if (!vertices.containsKey(vertexId)) {
- Vertex<I, V, E, M> v = conf.createVertex();
+ Vertex<I, V, E> v = conf.createVertex();
v.initialize(vertexId, conf.createVertexValue());
vertices.put(vertexId, v);
}
@@ -140,7 +138,7 @@ public class TestGraph<I extends WritableComparable,
*
* @return the iterator
*/
- public Iterator<Vertex<I, V, E, M>> iterator() {
+ public Iterator<Vertex<I, V, E>> iterator() {
return vertices.values().iterator();
}
@@ -150,7 +148,7 @@ public class TestGraph<I extends WritableComparable,
* @param id the id
* @return the value
*/
- public Vertex<I, V, E, M> getVertex(I id) {
+ public Vertex<I, V, E> getVertex(I id) {
return vertices.get(id);
}
@@ -177,10 +175,10 @@ public class TestGraph<I extends WritableComparable,
* @param edges edges to other vertices
* @return a new vertex
*/
- protected Vertex<I, V, E, M> makeVertex(I id, V value,
+ protected Vertex<I, V, E> makeVertex(I id, V value,
Entry<I, E>... edges) {
@SuppressWarnings("unchecked")
- Vertex<I, V, E, M> vertex = conf.createVertex();
+ Vertex<I, V, E> vertex = conf.createVertex();
vertex.initialize(id, value, createEdges(edges));
return vertex;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
index 0c9ee07..bad11d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
@@ -42,7 +42,7 @@ public abstract class VertexIdIterator<I extends WritableComparable> {
*/
public VertexIdIterator(
ExtendedDataOutput extendedDataOutput,
- ImmutableClassesGiraphConfiguration<I, ?, ?, ?> configuration) {
+ ImmutableClassesGiraphConfiguration<I, ?, ?> configuration) {
extendedDataInput = configuration.createExtendedDataInput(
extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index c607ca3..c78d717 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -56,14 +56,16 @@ public class WritableUtils {
* Read fields from byteArray to a Writeable object.
*
* @param byteArray Byte array to find the fields in.
- * @param writableObject Object to fill in the fields.
+ * @param writableObjects Objects to fill in the fields.
*/
public static void readFieldsFromByteArray(
- byte[] byteArray, Writable writableObject) {
+ byte[] byteArray, Writable... writableObjects) {
DataInputStream inputStream =
new DataInputStream(new ByteArrayInputStream(byteArray));
try {
- writableObject.readFields(inputStream);
+ for (Writable writableObject : writableObjects) {
+ writableObject.readFields(inputStream);
+ }
} catch (IOException e) {
throw new IllegalStateException(
"readFieldsFromByteArray: IOException", e);
@@ -77,16 +79,16 @@ public class WritableUtils {
* @param zkPath Path of znode.
* @param watch Add a watch?
* @param stat Stat of znode if desired.
- * @param writableObject Object to read into.
+ * @param writableObjects Objects to read into.
*/
public static void readFieldsFromZnode(ZooKeeperExt zkExt,
String zkPath,
boolean watch,
Stat stat,
- Writable writableObject) {
+ Writable... writableObjects) {
try {
byte[] zkData = zkExt.getData(zkPath, false, stat);
- readFieldsFromByteArray(zkData, writableObject);
+ readFieldsFromByteArray(zkData, writableObjects);
} catch (KeeperException e) {
throw new IllegalStateException(
"readFieldsFromZnode: KeeperException on " + zkPath, e);
@@ -99,15 +101,17 @@ public class WritableUtils {
/**
* Write object to a byte array.
*
- * @param writableObject Object to write from.
+ * @param writableObjects Objects to write from.
* @return Byte array with serialized object.
*/
- public static byte[] writeToByteArray(Writable writableObject) {
+ public static byte[] writeToByteArray(Writable... writableObjects) {
ByteArrayOutputStream outputStream =
new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
try {
- writableObject.write(output);
+ for (Writable writableObject : writableObjects) {
+ writableObject.write(output);
+ }
} catch (IOException e) {
throw new IllegalStateException(
"writeToByteArray: IOStateException", e);
@@ -189,15 +193,15 @@ public class WritableUtils {
* @param zkExt ZooKeeper instance.
* @param zkPath Path of znode.
* @param version Version of the write.
- * @param writableObject Object to write from.
+ * @param writableObjects Objects to write from.
* @return Path and stat information of the znode.
*/
public static PathStat writeToZnode(ZooKeeperExt zkExt,
String zkPath,
int version,
- Writable writableObject) {
+ Writable... writableObjects) {
try {
- byte[] byteArray = writeToByteArray(writableObject);
+ byte[] byteArray = writeToByteArray(writableObjects);
return zkExt.createOrSetExt(zkPath,
byteArray,
Ids.OPEN_ACL_UNSAFE,
@@ -341,15 +345,14 @@ public class WritableUtils {
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message value
* @return Byte array with serialized object.
*/
public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> byte[] writeVertexToByteArray(
- Vertex<I, V, E, M> vertex,
+ E extends Writable> byte[] writeVertexToByteArray(
+ Vertex<I, V, E> vertex,
byte[] buffer,
boolean unsafe,
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
ExtendedDataOutput extendedDataOutput;
if (unsafe) {
extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
@@ -378,14 +381,13 @@ public class WritableUtils {
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message value
* @return Byte array with serialized object.
*/
public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> byte[] writeVertexToByteArray(
- Vertex<I, V, E, M> vertex,
+ E extends Writable> byte[] writeVertexToByteArray(
+ Vertex<I, V, E> vertex,
boolean unsafe,
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
return writeVertexToByteArray(vertex, null, unsafe, conf);
}
@@ -400,16 +402,14 @@ public class WritableUtils {
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message value
* @param conf Configuration
*/
public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> void
- reinitializeVertexFromByteArray(
+ E extends Writable> void reinitializeVertexFromByteArray(
byte[] byteArray,
- Vertex<I, V, E, M> vertex,
+ Vertex<I, V, E> vertex,
boolean unsafe,
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
ExtendedDataInput extendedDataInput;
if (unsafe) {
extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
@@ -465,15 +465,14 @@ public class WritableUtils {
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message value
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> void reinitializeVertexFromDataInput(
+ E extends Writable> void reinitializeVertexFromDataInput(
DataInput input,
- Vertex<I, V, E, M> vertex,
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+ Vertex<I, V, E> vertex,
+ ImmutableClassesGiraphConfiguration<I, V, E> conf)
throws IOException {
vertex.getId().readFields(input);
vertex.getValue().readFields(input);
@@ -493,17 +492,16 @@ public class WritableUtils {
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message value
* @return The vertex
* @throws IOException
*/
public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> Vertex<I, V, E, M>
+ E extends Writable> Vertex<I, V, E>
readVertexFromDataInput(
DataInput input,
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+ ImmutableClassesGiraphConfiguration<I, V, E> conf)
throws IOException {
- Vertex<I, V, E, M> vertex = conf.createVertex();
+ Vertex<I, V, E> vertex = conf.createVertex();
I id = conf.createVertexId();
V value = conf.createVertexValue();
OutEdges<I, E> edges = conf.createOutEdges();
@@ -521,19 +519,56 @@ public class WritableUtils {
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message value
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> void writeVertexToDataOutput(
+ E extends Writable> void writeVertexToDataOutput(
DataOutput output,
- Vertex<I, V, E, M> vertex,
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+ Vertex<I, V, E> vertex,
+ ImmutableClassesGiraphConfiguration<I, V, E> conf)
throws IOException {
vertex.getId().write(output);
vertex.getValue().write(output);
((OutEdges<I, E>) vertex.getEdges()).write(output);
output.writeBoolean(vertex.isHalted());
}
+
+ /**
+ * Write class to data output. Also handles the case when class is null.
+ *
+ * @param clazz Class
+ * @param output Data output
+ * @param <T> Class type
+ */
+ public static <T> void writeClass(Class<T> clazz,
+ DataOutput output) throws IOException {
+ output.writeBoolean(clazz != null);
+ if (clazz != null) {
+ output.writeUTF(clazz.getName());
+ }
+ }
+
+ /**
+ * Read class from data input.
+ * Matches {@link #writeClass(Class, DataOutput)}.
+ *
+ * @param input Data input
+ * @param <T> Class type
+ * @return Class, or null if null was written
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Class<T> readClass(DataInput input) throws IOException {
+ if (input.readBoolean()) {
+ String className = input.readUTF();
+ try {
+ return (Class<T>) Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("readClass: No class found " +
+ className);
+ }
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 03a4876..8b5e39a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -35,7 +35,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GlobalStats;
-import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.InputSplitEvents;
import org.apache.giraph.graph.InputSplitPaths;
@@ -45,6 +44,7 @@ import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.VertexWriter;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.master.SuperstepClasses;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphTimer;
import org.apache.giraph.metrics.GiraphTimerContext;
@@ -108,13 +108,12 @@ import java.util.concurrent.TimeUnit;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class BspServiceWorker<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends BspService<I, V, E, M>
- implements CentralizedServiceWorker<I, V, E, M>,
+ V extends Writable, E extends Writable>
+ extends BspService<I, V, E>
+ implements CentralizedServiceWorker<I, V, E>,
ResetSuperstepMetricsObserver {
/** Name of gauge for time spent waiting on other workers */
public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
@@ -125,12 +124,12 @@ public class BspServiceWorker<I extends WritableComparable,
/** Worker info */
private final WorkerInfo workerInfo;
/** Worker graph partitioner */
- private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
+ private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
/** IPC Client */
- private final WorkerClient<I, V, E, M> workerClient;
+ private final WorkerClient<I, V, E> workerClient;
/** IPC Server */
- private final WorkerServer<I, V, E, M> workerServer;
+ private final WorkerServer<I, V, E> workerServer;
/** Request processor for aggregator requests */
private final WorkerAggregatorRequestProcessor
workerAggregatorRequestProcessor;
@@ -173,27 +172,28 @@ public class BspServiceWorker<I extends WritableComparable,
String serverPortList,
int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
- GraphTaskManager<I, V, E, M> graphTaskManager)
+ GraphTaskManager<I, V, E> graphTaskManager)
throws IOException, InterruptedException {
super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration();
+ ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
partitionExchangeChildrenChanged = new PredicateLock(context);
registerBspEvent(partitionExchangeChildrenChanged);
workerGraphPartitioner =
getGraphPartitionerFactory().createWorkerGraphPartitioner();
workerInfo = new WorkerInfo();
- workerServer = new NettyWorkerServer<I, V, E, M>(conf, this, context);
+ workerServer = new NettyWorkerServer<I, V, E>(conf, this, context);
workerInfo.setInetSocketAddress(workerServer.getMyAddress());
workerInfo.setTaskId(getTaskPartition());
- workerClient = new NettyWorkerClient<I, V, E, M>(context, conf, this);
+ workerClient = new NettyWorkerClient<I, V, E>(context, conf, this);
workerAggregatorRequestProcessor =
new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
- workerContext = conf.createWorkerContext(null);
-
aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
+ workerContext = conf.createWorkerContext();
+ workerContext.setWorkerAggregatorUsage(aggregatorHandler);
+
superstepOutput = conf.createSuperstepOutput(context);
if (conf.isJMapHistogramDumpEnabled()) {
@@ -223,7 +223,7 @@ public class BspServiceWorker<I extends WritableComparable,
}
@Override
- public WorkerClient<I, V, E, M> getWorkerClient() {
+ public WorkerClient<I, V, E> getWorkerClient() {
return workerClient;
}
@@ -285,7 +285,8 @@ public class BspServiceWorker<I extends WritableComparable,
/**
- * Load the vertices from the user-defined {@link VertexReader}
+ * Load the vertices from the user-defined
+ * {@link org.apache.giraph.io.VertexReader}
*
* @return Count of vertices and edges loaded
*/
@@ -295,10 +296,6 @@ public class BspServiceWorker<I extends WritableComparable,
getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
false, false, true);
- GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
- INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(),
- null, null);
-
InputSplitPathOrganizer splitOrganizer =
new InputSplitPathOrganizer(getZkExt(),
inputSplitPathList, getWorkerInfo().getHostname(),
@@ -310,11 +307,10 @@ public class BspServiceWorker<I extends WritableComparable,
BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
- VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
- new VertexInputSplitsCallableFactory<I, V, E, M>(
+ VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
+ new VertexInputSplitsCallableFactory<I, V, E>(
getConfiguration().createWrappedVertexInputFormat(),
getContext(),
- graphState,
getConfiguration(),
this,
splitsHandler,
@@ -324,7 +320,8 @@ public class BspServiceWorker<I extends WritableComparable,
}
/**
- * Load the edges from the user-defined {@link EdgeReader}.
+ * Load the edges from the user-defined
+ * {@link org.apache.giraph.io.EdgeReader}.
*
* @return Number of edges loaded
*/
@@ -333,10 +330,6 @@ public class BspServiceWorker<I extends WritableComparable,
getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
false, false, true);
- GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
- INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(),
- null, null);
-
InputSplitPathOrganizer splitOrganizer =
new InputSplitPathOrganizer(getZkExt(),
inputSplitPathList, getWorkerInfo().getHostname(),
@@ -348,11 +341,10 @@ public class BspServiceWorker<I extends WritableComparable,
BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
- EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
- new EdgeInputSplitsCallableFactory<I, V, E, M>(
+ EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
+ new EdgeInputSplitsCallableFactory<I, V, E>(
getConfiguration().createWrappedEdgeInputFormat(),
getContext(),
- graphState,
getConfiguration(),
this,
splitsHandler,
@@ -483,11 +475,8 @@ public class BspServiceWorker<I extends WritableComparable,
}
// Add the partitions that this worker owns
- GraphState<I, V, E, M> graphState =
- new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
- getContext(), getGraphTaskManager(), null, null);
Collection<? extends PartitionOwner> masterSetPartitionOwners =
- startSuperstep(graphState);
+ startSuperstep();
workerGraphPartitioner.updatePartitionOwners(
getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
@@ -552,7 +541,7 @@ else[HADOOP_NON_SECURE]*/
if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
!getPartitionStore().hasPartition(
partitionOwner.getPartitionId())) {
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
getConfiguration().createPartition(
partitionOwner.getPartitionId(), getContext());
getPartitionStore().addPartition(partition);
@@ -569,7 +558,7 @@ else[HADOOP_NON_SECURE]*/
List<PartitionStats> partitionStatsList =
new ArrayList<PartitionStats>();
for (Integer partitionId : getPartitionStore().getPartitionIds()) {
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
getPartitionStore().getPartition(partitionId);
PartitionStats partitionStats =
new PartitionStats(partition.getId(),
@@ -583,7 +572,7 @@ else[HADOOP_NON_SECURE]*/
workerGraphPartitioner.finalizePartitionStats(
partitionStatsList, getPartitionStore());
- return finishSuperstep(graphState, partitionStatsList);
+ return finishSuperstep(partitionStatsList);
}
/**
@@ -666,8 +655,7 @@ else[HADOOP_NON_SECURE]*/
}
@Override
- public Collection<? extends PartitionOwner> startSuperstep(
- GraphState<I, V, E, M> graphState) {
+ public Collection<? extends PartitionOwner> startSuperstep() {
// Algorithm:
// 1. Communication service will combine message from previous
// superstep
@@ -675,7 +663,7 @@ else[HADOOP_NON_SECURE]*/
// 3. Wait until the partition assignment is complete and get it
// 4. Get the aggregator values from the previous superstep
if (getSuperstep() != INPUT_SUPERSTEP) {
- workerServer.prepareSuperstep(graphState);
+ workerServer.prepareSuperstep();
}
registerHealth(getSuperstep());
@@ -727,7 +715,6 @@ else[HADOOP_NON_SECURE]*/
@Override
public FinishedSuperstepStats finishSuperstep(
- GraphState<I, V, E, M> graphState,
List<PartitionStats> partitionStatsList) {
// This barrier blocks until success (or the master signals it to
// restart).
@@ -740,10 +727,10 @@ else[HADOOP_NON_SECURE]*/
// 4. Report the statistics (vertices, edges, messages, etc.)
// of this worker
// 5. Let the master know it is finished.
- // 6. Wait for the master's global stats, and check if done
+ // 6. Wait for the master's superstep info, and check if done
waitForRequestsToFinish();
- graphState.getGraphTaskManager().notifyFinishedCommunication();
+ getGraphTaskManager().notifyFinishedCommunication();
long workerSentMessages = 0;
long localVertices = 0;
@@ -753,7 +740,7 @@ else[HADOOP_NON_SECURE]*/
}
if (getSuperstep() != INPUT_SUPERSTEP) {
- postSuperstepCallbacks(graphState);
+ postSuperstepCallbacks();
}
aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
@@ -779,17 +766,21 @@ else[HADOOP_NON_SECURE]*/
waitForOtherWorkers(superstepFinishedNode);
GlobalStats globalStats = new GlobalStats();
+ SuperstepClasses superstepClasses = new SuperstepClasses();
WritableUtils.readFieldsFromZnode(
- getZkExt(), superstepFinishedNode, false, null, globalStats);
+ getZkExt(), superstepFinishedNode, false, null, globalStats,
+ superstepClasses);
if (LOG.isInfoEnabled()) {
LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
- " with global stats " + globalStats);
+ " with global stats " + globalStats + " and classes " +
+ superstepClasses);
}
incrCachedSuperstep();
getContext().setStatus("finishSuperstep: (all workers done) " +
getGraphTaskManager().getGraphFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
+ getConfiguration().updateSuperstepClasses(superstepClasses);
return new FinishedSuperstepStats(
localVertices,
@@ -801,18 +792,15 @@ else[HADOOP_NON_SECURE]*/
/**
* Handle post-superstep callbacks
- *
- * @param graphState GraphState
*/
- private void postSuperstepCallbacks(GraphState<I, V, E, M> graphState) {
- getWorkerContext().setGraphState(graphState);
+ private void postSuperstepCallbacks() {
GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
getWorkerContext().postSuperstep();
timerContext.stop();
getContext().progress();
for (WorkerObserver obs : getWorkerObservers()) {
- obs.postSuperstep(graphState.getSuperstep());
+ obs.postSuperstep(getSuperstep());
getContext().progress();
}
}
@@ -943,9 +931,7 @@ else[HADOOP_NON_SECURE]*/
public Void call() throws Exception {
VertexWriter<I, V, E> vertexWriter =
vertexOutputFormat.createVertexWriter(getContext());
- vertexWriter.setConf(
- (ImmutableClassesGiraphConfiguration<I, V, E, Writable>)
- getConfiguration());
+ vertexWriter.setConf(getConfiguration());
vertexWriter.initialize(getContext());
long verticesWritten = 0;
long nextPrintVertices = 0;
@@ -953,9 +939,9 @@ else[HADOOP_NON_SECURE]*/
int partitionIndex = 0;
int numPartitions = getPartitionStore().getNumPartitions();
for (Integer partitionId : getPartitionStore().getPartitionIds()) {
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
getPartitionStore().getPartition(partitionId);
- for (Vertex<I, V, E, M> vertex : partition) {
+ for (Vertex<I, V, E> vertex : partition) {
vertexWriter.writeVertex(vertex);
++verticesWritten;
@@ -1104,7 +1090,7 @@ else[HADOOP_NON_SECURE]*/
ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
for (Integer partitionId : getPartitionStore().getPartitionIds()) {
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
getPartitionStore().getPartition(partitionId);
long startPos = verticesOutputStream.getPos();
partition.write(verticesOutputStream);
@@ -1209,7 +1195,7 @@ else[HADOOP_NON_SECURE]*/
" not found!");
}
metadataStream.close();
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
getConfiguration().createPartition(partitionId, getContext());
DataInputStream partitionsStream =
getFs().open(new Path(partitionsFile));
@@ -1250,18 +1236,21 @@ else[HADOOP_NON_SECURE]*/
" total.");
}
- // Load global statistics
- GlobalStats globalStats = null;
+ // Load global stats and superstep classes
+ GlobalStats globalStats = new GlobalStats();
+ SuperstepClasses superstepClasses = new SuperstepClasses();
String finalizedCheckpointPath =
getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
try {
DataInputStream finalizedStream =
getFs().open(new Path(finalizedCheckpointPath));
- globalStats = new GlobalStats();
globalStats.readFields(finalizedStream);
+ superstepClasses.readFields(finalizedStream);
+ getConfiguration().updateSuperstepClasses(superstepClasses);
} catch (IOException e) {
throw new IllegalStateException(
- "loadCheckpoint: Failed to load global statistics", e);
+ "loadCheckpoint: Failed to load global stats and superstep classes",
+ e);
}
// Communication service needs to setup the connections prior to
@@ -1287,13 +1276,13 @@ else[HADOOP_NON_SECURE]*/
new ArrayList<Entry<WorkerInfo, List<Integer>>>(
workerPartitionMap.entrySet());
Collections.shuffle(randomEntryList);
- WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor =
- new NettyWorkerClientRequestProcessor<I, V, E, M>(getContext(),
+ WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
+ new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
getConfiguration(), this);
for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
randomEntryList) {
for (Integer partitionId : workerPartitionList.getValue()) {
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
getPartitionStore().removePartition(partitionId);
if (partition == null) {
throw new IllegalStateException(
@@ -1467,7 +1456,7 @@ else[HADOOP_NON_SECURE]*/
}
@Override
- public PartitionStore<I, V, E, M> getPartitionStore() {
+ public PartitionStore<I, V, E> getPartitionStore() {
return getServerData().getPartitionStore();
}
@@ -1493,7 +1482,7 @@ else[HADOOP_NON_SECURE]*/
}
@Override
- public ServerData<I, V, E, M> getServerData() {
+ public ServerData<I, V, E> getServerData() {
return workerServer.getServerData();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 351a114..78cdd8e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -20,7 +20,6 @@ package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
@@ -48,11 +47,10 @@ import java.io.IOException;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
public class EdgeInputSplitsCallable<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends InputSplitsCallable<I, V, E, M> {
+ V extends Writable, E extends Writable>
+ extends InputSplitsCallable<I, V, E> {
/** How often to update metrics and print info */
public static final int EDGES_UPDATE_PERIOD = 1000000;
/** How often to update filtered metrics */
@@ -80,7 +78,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
*
* @param edgeInputFormat Edge input format
* @param context Context
- * @param graphState Graph state
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
@@ -89,13 +86,12 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
public EdgeInputSplitsCallable(
EdgeInputFormat<I, E> edgeInputFormat,
Mapper<?, ?, ?, ?>.Context context,
- GraphState<I, V, E, M> graphState,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- BspServiceWorker<I, V, E, M> bspServiceWorker,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ BspServiceWorker<I, V, E> bspServiceWorker,
InputSplitsHandler splitsHandler,
ZooKeeperExt zooKeeperExt) {
- super(context, graphState, configuration, bspServiceWorker,
- splitsHandler, zooKeeperExt);
+ super(context, configuration, bspServiceWorker, splitsHandler,
+ zooKeeperExt);
this.edgeInputFormat = edgeInputFormat;
inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
@@ -116,20 +112,18 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
* maximum number of edges to be read from an input split.
*
* @param inputSplit Input split to process with edge reader
- * @param graphState Current graph state
* @return Edges loaded from this input split
* @throws IOException
* @throws InterruptedException
*/
@Override
protected VertexEdgeCount readInputSplit(
- InputSplit inputSplit,
- GraphState<I, V, E, M> graphState) throws IOException,
+ InputSplit inputSplit) throws IOException,
InterruptedException {
EdgeReader<I, E> edgeReader =
edgeInputFormat.createEdgeReader(inputSplit, context);
edgeReader.setConf(
- (ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>)
+ (ImmutableClassesGiraphConfiguration<I, Writable, E>)
configuration);
edgeReader.initialize(inputSplit, context);
@@ -166,8 +160,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
continue;
}
- graphState.getWorkerClientRequestProcessor().sendEdgeRequest(sourceId,
- readerEdge);
+ workerClientRequestProcessor.sendEdgeRequest(sourceId, readerEdge);
context.progress(); // do this before potential data transfer
// Update status every EDGES_UPDATE_PERIOD edges
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 33fb515..f68ac93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -19,7 +19,6 @@
package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.utils.CallableFactory;
@@ -34,21 +33,18 @@ import org.apache.hadoop.mapreduce.Mapper;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
+ V extends Writable, E extends Writable>
implements CallableFactory<VertexEdgeCount> {
/** Edge input format */
private final EdgeInputFormat<I, E> edgeInputFormat;
/** Mapper context. */
private final Mapper<?, ?, ?, ?>.Context context;
- /** Graph state. */
- private final GraphState<I, V, E, M> graphState;
/** Configuration. */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** {@link BspServiceWorker} we're running on. */
- private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+ private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Handler for input splits */
private final InputSplitsHandler splitsHandler;
/** {@link ZooKeeperExt} for this worker. */
@@ -59,7 +55,6 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
*
* @param edgeInputFormat Edge input format
* @param context Mapper context
- * @param graphState Graph state
* @param configuration Configuration
* @param bspServiceWorker Calling {@link BspServiceWorker}
* @param splitsHandler Handler for input splits
@@ -68,14 +63,12 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
public EdgeInputSplitsCallableFactory(
EdgeInputFormat<I, E> edgeInputFormat,
Mapper<?, ?, ?, ?>.Context context,
- GraphState<I, V, E, M> graphState,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- BspServiceWorker<I, V, E, M> bspServiceWorker,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ BspServiceWorker<I, V, E> bspServiceWorker,
InputSplitsHandler splitsHandler,
ZooKeeperExt zooKeeperExt) {
this.edgeInputFormat = edgeInputFormat;
this.context = context;
- this.graphState = graphState;
this.configuration = configuration;
this.bspServiceWorker = bspServiceWorker;
this.zooKeeperExt = zooKeeperExt;
@@ -83,11 +76,10 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
}
@Override
- public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
- return new EdgeInputSplitsCallable<I, V, E, M>(
+ public InputSplitsCallable<I, V, E> newCallable(int threadId) {
+ return new EdgeInputSplitsCallable<I, V, E>(
edgeInputFormat,
context,
- graphState,
configuration,
bspServiceWorker,
splitsHandler,
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index a8298c5..10b1a25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -21,7 +21,6 @@ package org.apache.giraph.worker;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.metrics.GiraphMetrics;
@@ -57,24 +56,20 @@ import java.util.concurrent.Callable;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
public abstract class InputSplitsCallable<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
+ V extends Writable, E extends Writable>
implements Callable<VertexEdgeCount> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
/** Class time object */
private static final Time TIME = SystemTime.get();
/** Configuration */
- protected final ImmutableClassesGiraphConfiguration<I, V, E, M>
- configuration;
+ protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** Context */
protected final Mapper<?, ?, ?, ?>.Context context;
- /** Graph state */
- private final GraphState<I, V, E, M> graphState;
/** Handles IPC communication */
- private final WorkerClientRequestProcessor<I, V, E, M>
+ protected final WorkerClientRequestProcessor<I, V, E>
workerClientRequestProcessor;
/**
* Stores and processes the list of InputSplits advertised
@@ -93,7 +88,6 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* Constructor.
*
* @param context Context
- * @param graphState Graph state
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
@@ -101,20 +95,15 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
*/
public InputSplitsCallable(
Mapper<?, ?, ?, ?>.Context context,
- GraphState<I, V, E, M> graphState,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- BspServiceWorker<I, V, E, M> bspServiceWorker,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ BspServiceWorker<I, V, E> bspServiceWorker,
InputSplitsHandler splitsHandler,
ZooKeeperExt zooKeeperExt) {
this.zooKeeperExt = zooKeeperExt;
this.context = context;
this.workerClientRequestProcessor =
- new NettyWorkerClientRequestProcessor<I, V, E, M>(
+ new NettyWorkerClientRequestProcessor<I, V, E>(
context, configuration, bspServiceWorker);
- this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
- graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
- context, graphState.getGraphTaskManager(), workerClientRequestProcessor,
- null);
this.useLocality = configuration.useInputSplitLocality();
this.splitsHandler = splitsHandler;
this.configuration = configuration;
@@ -205,14 +194,11 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* Load vertices/edges from the given input split.
*
* @param inputSplit Input split to load
- * @param graphState Graph state
* @return Count of vertices and edges loaded
* @throws IOException
* @throws InterruptedException
*/
- protected abstract VertexEdgeCount readInputSplit(
- InputSplit inputSplit,
- GraphState<I, V, E, M> graphState)
+ protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit)
throws IOException, InterruptedException;
@Override
@@ -222,9 +208,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
int inputSplitsProcessed = 0;
try {
while ((inputSplitPath = splitsHandler.reserveInputSplit()) != null) {
- vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
- loadInputSplit(inputSplitPath,
- graphState));
+ vertexEdgeCount =
+ vertexEdgeCount.incrVertexEdgeCount(loadInputSplit(inputSplitPath));
context.progress();
++inputSplitsProcessed;
}
@@ -267,7 +252,6 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* Mark the input split finished when done.
*
* @param inputSplitPath ZK location of input split
- * @param graphState Current graph state
* @return Mapping of vertex indices and statistics, or null if no data read
* @throws IOException
* @throws ClassNotFoundException
@@ -276,13 +260,11 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* @throws IllegalAccessException
*/
private VertexEdgeCount loadInputSplit(
- String inputSplitPath,
- GraphState<I, V, E, M> graphState)
+ String inputSplitPath)
throws IOException, ClassNotFoundException, InterruptedException,
InstantiationException, IllegalAccessException {
InputSplit inputSplit = getInputSplit(inputSplitPath);
- VertexEdgeCount vertexEdgeCount =
- readInputSplit(inputSplit, graphState);
+ VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
if (LOG.isInfoEnabled()) {
LOG.info("loadFromInputSplit: Finished loading " +
inputSplitPath + " " + vertexEdgeCount);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 1c292ad..977e100 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -19,7 +19,6 @@
package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
@@ -50,11 +49,10 @@ import java.io.IOException;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
public class VertexInputSplitsCallable<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends InputSplitsCallable<I, V, E, M> {
+ V extends Writable, E extends Writable>
+ extends InputSplitsCallable<I, V, E> {
/** How often to update metrics and print info */
public static final int VERTICES_UPDATE_PERIOD = 250000;
/** How often to update filtered out metrics */
@@ -68,9 +66,9 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
/** Input split max vertices (-1 denotes all) */
private final long inputSplitMaxVertices;
/** Bsp service worker (only use thread-safe methods) */
- private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+ private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Filter to select which vertices to keep */
- private final VertexInputFilter<I, V, E, M> vertexInputFilter;
+ private final VertexInputFilter<I, V, E> vertexInputFilter;
// Metrics
/** number of vertices loaded meter across all readers */
@@ -85,7 +83,6 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
*
* @param vertexInputFormat Vertex input format
* @param context Context
- * @param graphState Graph state
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
@@ -94,13 +91,12 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
public VertexInputSplitsCallable(
VertexInputFormat<I, V, E> vertexInputFormat,
Mapper<?, ?, ?, ?>.Context context,
- GraphState<I, V, E, M> graphState,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- BspServiceWorker<I, V, E, M> bspServiceWorker,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ BspServiceWorker<I, V, E> bspServiceWorker,
InputSplitsHandler splitsHandler,
ZooKeeperExt zooKeeperExt) {
- super(context, graphState, configuration, bspServiceWorker,
- splitsHandler, zooKeeperExt);
+ super(context, configuration, bspServiceWorker, splitsHandler,
+ zooKeeperExt);
this.vertexInputFormat = vertexInputFormat;
inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
@@ -123,20 +119,16 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
* maximum number of vertices to be read from an input split.
*
* @param inputSplit Input split to process with vertex reader
- * @param graphState Current graph state
* @return Vertices and edges loaded from this input split
* @throws IOException
* @throws InterruptedException
*/
@Override
protected VertexEdgeCount readInputSplit(
- InputSplit inputSplit,
- GraphState<I, V, E, M> graphState)
- throws IOException, InterruptedException {
+ InputSplit inputSplit) throws IOException, InterruptedException {
VertexReader<I, V, E> vertexReader =
vertexInputFormat.createVertexReader(inputSplit, context);
- vertexReader.setConf(
- (ImmutableClassesGiraphConfiguration<I, V, E, Writable>) configuration);
+ vertexReader.setConf(configuration);
vertexReader.initialize(inputSplit, context);
long inputSplitVerticesLoaded = 0;
@@ -146,8 +138,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
long inputSplitEdgesLoaded = 0;
while (vertexReader.nextVertex()) {
- Vertex<I, V, E, M> readerVertex =
- (Vertex<I, V, E, M>) vertexReader.getCurrentVertex();
+ Vertex<I, V, E> readerVertex = vertexReader.getCurrentVertex();
if (readerVertex.getId() == null) {
throw new IllegalArgumentException(
"readInputSplit: Vertex reader returned a vertex " +
@@ -157,7 +148,6 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
readerVertex.setValue(configuration.createVertexValue());
}
readerVertex.setConf(configuration);
- readerVertex.setGraphState(graphState);
++inputSplitVerticesLoaded;
@@ -172,7 +162,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
PartitionOwner partitionOwner =
bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
- graphState.getWorkerClientRequestProcessor().sendVertexRequest(
+ workerClientRequestProcessor.sendVertexRequest(
partitionOwner, readerVertex);
context.progress(); // do this before potential data transfer
edgesSinceLastUpdate += readerVertex.getNumEdges();
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index cf5e8ad..c9893d2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -19,7 +19,6 @@
package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.utils.CallableFactory;
@@ -34,21 +33,18 @@ import org.apache.hadoop.mapreduce.Mapper;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
public class VertexInputSplitsCallableFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
+ V extends Writable, E extends Writable>
implements CallableFactory<VertexEdgeCount> {
/** Vertex input format */
private final VertexInputFormat<I, V, E> vertexInputFormat;
/** Mapper context. */
private final Mapper<?, ?, ?, ?>.Context context;
- /** Graph state. */
- private final GraphState<I, V, E, M> graphState;
/** Configuration. */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** {@link BspServiceWorker} we're running on. */
- private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+ private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Handler for input splits */
private final InputSplitsHandler splitsHandler;
/** {@link ZooKeeperExt} for this worker. */
@@ -59,7 +55,6 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
*
* @param vertexInputFormat Vertex input format
* @param context Mapper context
- * @param graphState Graph state
* @param configuration Configuration
* @param bspServiceWorker Calling {@link BspServiceWorker}
* @param splitsHandler Handler for input splits
@@ -68,14 +63,12 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
public VertexInputSplitsCallableFactory(
VertexInputFormat<I, V, E> vertexInputFormat,
Mapper<?, ?, ?, ?>.Context context,
- GraphState<I, V, E, M> graphState,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- BspServiceWorker<I, V, E, M> bspServiceWorker,
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ BspServiceWorker<I, V, E> bspServiceWorker,
InputSplitsHandler splitsHandler,
ZooKeeperExt zooKeeperExt) {
this.vertexInputFormat = vertexInputFormat;
this.context = context;
- this.graphState = graphState;
this.configuration = configuration;
this.bspServiceWorker = bspServiceWorker;
this.zooKeeperExt = zooKeeperExt;
@@ -83,11 +76,10 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
}
@Override
- public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
- return new VertexInputSplitsCallable<I, V, E, M>(
+ public InputSplitsCallable<I, V, E> newCallable(int threadId) {
+ return new VertexInputSplitsCallable<I, V, E>(
vertexInputFormat,
context,
- graphState,
configuration,
bspServiceWorker,
splitsHandler,
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 9a8a8b8..9bfd7b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -64,7 +64,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
private Map<String, Aggregator<Writable>> currentAggregatorMap =
Maps.newHashMap();
/** Service worker */
- private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
+ private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
/** Progressable for reporting progress */
private final Progressable progressable;
/** How big a single aggregator request can be */
@@ -80,7 +80,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
* @param progressable Progressable for reporting progress
*/
public WorkerAggregatorHandler(
- CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+ CentralizedServiceWorker<?, ?, ?> serviceWorker,
ImmutableClassesGiraphConfiguration conf,
Progressable progressable) {
this.serviceWorker = serviceWorker;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index d3ffaea..729ba14 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.mapreduce.Mapper;
public abstract class WorkerContext implements WorkerAggregatorUsage {
/** Global graph state */
private GraphState graphState;
+ /** Worker aggregator usage */
+ private WorkerAggregatorUsage workerAggregatorUsage;
/**
* Set the graph state.
@@ -41,6 +43,16 @@ public abstract class WorkerContext implements WorkerAggregatorUsage {
}
/**
+ * Set worker aggregator usage
+ *
+ * @param workerAggregatorUsage Worker aggregator usage
+ */
+ public void setWorkerAggregatorUsage(
+ WorkerAggregatorUsage workerAggregatorUsage) {
+ this.workerAggregatorUsage = workerAggregatorUsage;
+ }
+
+ /**
* Initialize the WorkerContext.
* This method is executed once on each Worker before the first
* superstep starts.
@@ -112,11 +124,11 @@ public abstract class WorkerContext implements WorkerAggregatorUsage {
@Override
public <A extends Writable> void aggregate(String name, A value) {
- graphState.getWorkerAggregatorUsage().aggregate(name, value);
+ workerAggregatorUsage.aggregate(name, value);
}
@Override
public <A extends Writable> A getAggregatedValue(String name) {
- return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
+ return workerAggregatorUsage.<A>getAggregatedValue(name);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
index 91b842a..e771e36 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
@@ -25,8 +25,8 @@ import org.apache.giraph.comm.netty.handler.RequestServerHandler;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.IntNoOpComputation;
import org.apache.giraph.utils.MockUtils;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -46,17 +46,10 @@ public class ConnectionTest {
/** Class configuration */
private ImmutableClassesGiraphConfiguration conf;
- public static class IntVertex extends Vertex<IntWritable,
- IntWritable, IntWritable, IntWritable> {
- @Override
- public void compute(Iterable<IntWritable> messages) throws IOException {
- }
- }
-
@Before
public void setUp() {
GiraphConfiguration tmpConfig = new GiraphConfiguration();
- tmpConfig.setVertexClass(IntVertex.class);
+ tmpConfig.setComputationClass(IntNoOpComputation.class);
conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
}
@@ -71,7 +64,7 @@ public class ConnectionTest {
Context context = mock(Context.class);
when(context.getConfiguration()).thenReturn(conf);
- ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+ ServerData<IntWritable, IntWritable, IntWritable> serverData =
MockUtils.createNewServerData(conf, context);
WorkerInfo workerInfo = new WorkerInfo();
NettyServer server =
@@ -100,7 +93,7 @@ public class ConnectionTest {
Context context = mock(Context.class);
when(context.getConfiguration()).thenReturn(conf);
- ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+ ServerData<IntWritable, IntWritable, IntWritable> serverData =
MockUtils.createNewServerData(conf, context);
RequestServerHandler.Factory requestServerHandlerFactory =
new WorkerRequestServerHandler.Factory(serverData);
@@ -150,7 +143,7 @@ public class ConnectionTest {
Context context = mock(Context.class);
when(context.getConfiguration()).thenReturn(conf);
- ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+ ServerData<IntWritable, IntWritable, IntWritable> serverData =
MockUtils.createNewServerData(conf, context);
WorkerInfo workerInfo = new WorkerInfo();
NettyServer server = new NettyServer(conf,
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 58aa7d1..5c69161 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -26,8 +26,8 @@ import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.IntNoOpComputation;
import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.worker.WorkerInfo;
@@ -51,7 +51,7 @@ public class RequestFailureTest {
/** Configuration */
private ImmutableClassesGiraphConfiguration conf;
/** Server data */
- private ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+ private ServerData<IntWritable, IntWritable, IntWritable>
serverData;
/** Server */
private NettyServer server;
@@ -60,21 +60,11 @@ public class RequestFailureTest {
/** Mock context */
private Context context;
- /**
- * Only for testing.
- */
- public static class TestVertex extends Vertex<IntWritable,
- IntWritable, IntWritable, IntWritable> {
- @Override
- public void compute(Iterable<IntWritable> messages) throws IOException {
- }
- }
-
@Before
public void setUp() throws IOException {
// Setup the conf
GiraphConfiguration tmpConf = new GiraphConfiguration();
- tmpConf.setVertexClass(TestVertex.class);
+ tmpConf.setComputationClass(IntNoOpComputation.class);
conf = new ImmutableClassesGiraphConfiguration(tmpConf);
context = mock(Context.class);
@@ -91,7 +81,8 @@ public class RequestFailureTest {
dataToSend.initialize();
ByteArrayVertexIdMessages<IntWritable,
IntWritable> vertexIdMessages =
- new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
+ new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
+ IntWritable.class);
vertexIdMessages.setConf(conf);
vertexIdMessages.initialize();
dataToSend.add(partitionId, vertexIdMessages);
@@ -117,7 +108,8 @@ public class RequestFailureTest {
for (IntWritable vertexId : vertices) {
keySum += vertexId.get();
Iterable<IntWritable> messages =
- serverData.getIncomingMessageStore().getVertexMessages(vertexId);
+ serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
+ vertexId);
synchronized (messages) {
for (IntWritable message : messages) {
messageSum += message.get();
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index f1f8e26..7016572 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -35,6 +35,7 @@ import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.IntNoOpComputation;
import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.worker.WorkerInfo;
@@ -63,8 +64,7 @@ public class RequestTest {
/** Configuration */
private ImmutableClassesGiraphConfiguration conf;
/** Server data */
- private ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- serverData;
+ private ServerData<IntWritable, IntWritable, IntWritable> serverData;
/** Server */
private NettyServer server;
/** Client */
@@ -72,21 +72,11 @@ public class RequestTest {
/** Worker info */
private WorkerInfo workerInfo;
- /**
- * Only for testing.
- */
- public static class TestVertex extends Vertex<IntWritable,
- IntWritable, IntWritable, IntWritable> {
- @Override
- public void compute(Iterable<IntWritable> messages) throws IOException {
- }
- }
-
@Before
public void setUp() throws IOException {
// Setup the conf
GiraphConfiguration tmpConf = new GiraphConfiguration();
- GiraphConstants.VERTEX_CLASS.set(tmpConf, TestVertex.class);
+ GiraphConstants.COMPUTATION_CLASS.set(tmpConf, IntNoOpComputation.class);
conf = new ImmutableClassesGiraphConfiguration(tmpConf);
@SuppressWarnings("rawtypes")
@@ -110,7 +100,7 @@ public class RequestTest {
public void sendVertexPartition() throws IOException {
// Data to send
int partitionId = 13;
- Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition =
+ Partition<IntWritable, IntWritable, IntWritable> partition =
conf.createPartition(partitionId, null);
for (int i = 0; i < 10; ++i) {
Vertex vertex = conf.createVertex();
@@ -119,10 +109,8 @@ public class RequestTest {
}
// Send the request
- SendVertexRequest<IntWritable, IntWritable, IntWritable,
- IntWritable> request =
- new SendVertexRequest<IntWritable, IntWritable,
- IntWritable, IntWritable>(partition);
+ SendVertexRequest<IntWritable, IntWritable, IntWritable> request =
+ new SendVertexRequest<IntWritable, IntWritable, IntWritable>(partition);
client.sendWritableRequest(workerInfo.getTaskId(), request);
client.waitAllRequests();
@@ -131,15 +119,13 @@ public class RequestTest {
server.stop();
// Check the output
- PartitionStore<IntWritable, IntWritable,
- IntWritable, IntWritable> partitionStore =
+ PartitionStore<IntWritable, IntWritable, IntWritable> partitionStore =
serverData.getPartitionStore();
assertTrue(partitionStore.hasPartition(partitionId));
int total = 0;
- Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition2 =
+ Partition<IntWritable, IntWritable, IntWritable> partition2 =
partitionStore.getPartition(partitionId);
- for (Vertex<IntWritable, IntWritable,
- IntWritable, IntWritable> vertex : partition2) {
+ for (Vertex<IntWritable, IntWritable, IntWritable> vertex : partition2) {
total += vertex.getId().get();
}
partitionStore.putPartition(partition2);
@@ -158,7 +144,8 @@ public class RequestTest {
int partitionId = 0;
ByteArrayVertexIdMessages<IntWritable,
IntWritable> vertexIdMessages =
- new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
+ new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
+ IntWritable.class);
vertexIdMessages.setConf(conf);
vertexIdMessages.initialize();
dataToSend.add(partitionId, vertexIdMessages);
@@ -187,7 +174,8 @@ public class RequestTest {
for (IntWritable vertexId : vertices) {
keySum += vertexId.get();
Iterable<IntWritable> messages =
- serverData.getIncomingMessageStore().getVertexMessages(vertexId);
+ serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
+ vertexId);
synchronized (messages) {
for (IntWritable message : messages) {
messageSum += message.get();
@@ -203,12 +191,11 @@ public class RequestTest {
// Data to send
int partitionId = 19;
Map<IntWritable, VertexMutations<IntWritable, IntWritable,
- IntWritable, IntWritable>> vertexIdMutations =
+ IntWritable>> vertexIdMutations =
Maps.newHashMap();
for (int i = 0; i < 11; ++i) {
- VertexMutations<IntWritable, IntWritable, IntWritable, IntWritable>
- mutations = new VertexMutations<IntWritable, IntWritable,
- IntWritable, IntWritable>();
+ VertexMutations<IntWritable, IntWritable, IntWritable> mutations =
+ new VertexMutations<IntWritable, IntWritable, IntWritable>();
for (int j = 0; j < 3; ++j) {
Vertex vertex = conf.createVertex();
vertex.initialize(new IntWritable(i), new IntWritable(j));
@@ -229,10 +216,9 @@ public class RequestTest {
}
// Send the request
- SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable,
- IntWritable> request =
- new SendPartitionMutationsRequest<IntWritable, IntWritable,
- IntWritable, IntWritable>(partitionId, vertexIdMutations);
+ SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable>
+ request = new SendPartitionMutationsRequest<IntWritable, IntWritable,
+ IntWritable>(partitionId, vertexIdMutations);
GiraphMetrics.init(conf);
client.sendWritableRequest(workerInfo.getTaskId(), request);
client.waitAllRequests();
@@ -243,16 +229,16 @@ public class RequestTest {
// Check the output
ConcurrentHashMap<IntWritable, VertexMutations<IntWritable, IntWritable,
- IntWritable, IntWritable>> inVertexIdMutations =
+ IntWritable>> inVertexIdMutations =
serverData.getVertexMutations();
int keySum = 0;
for (Entry<IntWritable, VertexMutations<IntWritable, IntWritable,
- IntWritable, IntWritable>> entry :
+ IntWritable>> entry :
inVertexIdMutations.entrySet()) {
synchronized (entry.getValue()) {
keySum += entry.getKey().get();
int vertexValueSum = 0;
- for (Vertex<IntWritable, IntWritable, IntWritable, IntWritable>
+ for (Vertex<IntWritable, IntWritable, IntWritable>
vertex : entry.getValue().getAddedVertexList()) {
vertexValueSum += vertex.getValue().get();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
index c27156f..c026cf8 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
@@ -25,7 +25,7 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.IntNoOpComputation;
import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.IntWritable;
@@ -48,17 +48,10 @@ public class SaslConnectionTest {
/** Class configuration */
private ImmutableClassesGiraphConfiguration conf;
- public static class IntVertex extends Vertex<IntWritable,
- IntWritable, IntWritable, IntWritable> {
- @Override
- public void compute(Iterable<IntWritable> messages) throws IOException {
- }
- }
-
@Before
public void setUp() {
GiraphConfiguration tmpConfig = new GiraphConfiguration();
- tmpConfig.setVertexClass(IntVertex.class);
+ tmpConfig.setComputationClass(IntNoOpComputation.class);
GiraphConstants.AUTHENTICATE.set(tmpConfig, true);
conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
}
@@ -74,7 +67,7 @@ public class SaslConnectionTest {
Context context = mock(Context.class);
when(context.getConfiguration()).thenReturn(conf);
- ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+ ServerData<IntWritable, IntWritable, IntWritable> serverData =
MockUtils.createNewServerData(conf, context);
SaslServerHandler.Factory mockedSaslServerFactory =