You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/05/09 20:47:34 UTC
svn commit: r1336344 [2/2] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
src/test/java/org/apache/giraph/comm/ s...
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java Wed May 9 18:47:32 2012
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.giraph.comm.RequestRegistry.Type;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Send a collection of vertex mutations for a partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class SendPartitionMutationsRequest<I extends WritableComparable,
+ V extends Writable, E extends Writable,
+ M extends Writable> implements WritableRequest<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SendPartitionMutationsRequest.class);
+ /** Partition id */
+ private int partitionId;
+ /** Mutations sent for a partition */
+ private Map<I, VertexMutations<I, V, E, M>> vertexIdMutations;
+ /** Configuration */
+ private Configuration conf;
+
+ /**
+ * Constructor used for reflection only
+ */
+ public SendPartitionMutationsRequest() { }
+
+ /**
+ * Constructor used to send request.
+ *
+ * @param partitionId Partition to send the request to
+ * @param vertexIdMutations Map of mutations to send
+ */
+ public SendPartitionMutationsRequest(int partitionId,
+ Map<I, VertexMutations<I, V, E, M>> vertexIdMutations) {
+ this.partitionId = partitionId;
+ this.vertexIdMutations = vertexIdMutations;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ partitionId = input.readInt();
+ int vertexIdMutationsSize = input.readInt();
+ vertexIdMutations = Maps.newHashMapWithExpectedSize(vertexIdMutationsSize);
+ for (int i = 0; i < vertexIdMutationsSize; ++i) {
+ I vertexId = BspUtils.createVertexIndex(conf);
+ vertexId.readFields(input);
+ VertexMutations<I, V, E, M> vertexMutations =
+ new VertexMutations<I, V, E, M>();
+ vertexMutations.setConf(conf);
+ vertexMutations.readFields(input);
+ if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
+ throw new IllegalStateException(
+ "readFields: Already has vertex id " + vertexId);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(partitionId);
+ output.writeInt(vertexIdMutations.size());
+ for (Entry<I, VertexMutations<I, V, E, M>> entry :
+ vertexIdMutations.entrySet()) {
+ entry.getKey().write(output);
+ entry.getValue().write(output);
+ }
+ }
+
+ @Override
+ public Type getType() {
+ return Type.SEND_PARTITION_MUTATIONS_REQUEST;
+ }
+
+ @Override
+ public void doRequest(ServerData<I, V, E, M> serverData) {
+ ConcurrentHashMap<I, VertexMutations<I, V, E, M>> vertexMutations =
+ serverData.getVertexMutations();
+ for (Entry<I, VertexMutations<I, V, E, M>> entry :
+ vertexIdMutations.entrySet()) {
+ VertexMutations<I, V, E, M> mutations =
+ vertexMutations.get(entry.getKey());
+ if (mutations == null) {
+ mutations = vertexMutations.putIfAbsent(
+ entry.getKey(), entry.getValue());
+ if (mutations == null) {
+ continue;
+ }
+ }
+ synchronized (mutations) {
+ mutations.addVertexMutations(entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+}
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java Wed May 9 18:47:32 2012
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.giraph.comm.RequestRegistry.Type;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Send a collection of vertices for a partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class SendVertexRequest<I extends WritableComparable,
+ V extends Writable, E extends Writable,
+ M extends Writable> implements WritableRequest<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SendVertexRequest.class);
+ /** Partition id */
+ private int partitionId;
+ /** List of vertices to be stored on this partition */
+ private Collection<BasicVertex<I, V, E, M>> vertices;
+ /** Configuration */
+ private Configuration conf;
+
+ /**
+ * Constructor used for reflection only
+ */
+ public SendVertexRequest() { }
+
+ /**
+ * Constructor for sending a request.
+ *
+ * @param partitionId Partition to send the request to
+ * @param vertices Vertices to send
+ */
+ public SendVertexRequest(
+ int partitionId, Collection<BasicVertex<I, V, E, M>> vertices) {
+ this.partitionId = partitionId;
+ this.vertices = vertices;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ partitionId = input.readInt();
+ int verticesCount = input.readInt();
+ vertices = Lists.newArrayListWithCapacity(verticesCount);
+ for (int i = 0; i < verticesCount; ++i) {
+ BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+ vertex.readFields(input);
+ vertices.add(vertex);
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(partitionId);
+ output.writeInt(vertices.size());
+ for (BasicVertex<I, V, E, M> vertex : vertices) {
+ vertex.write(output);
+ }
+ }
+
+ @Override
+ public Type getType() {
+ return Type.SEND_VERTEX_REQUEST;
+ }
+
+ @Override
+ public void doRequest(ServerData<I, V, E, M> serverData) {
+ ConcurrentHashMap<Integer, Collection<BasicVertex<I, V, E, M>>>
+ partitionVertexMap = serverData.getPartitionVertexMap();
+ if (vertices.isEmpty()) {
+ LOG.warn("doRequest: Got an empty request!");
+ return;
+ }
+ Collection<BasicVertex<I, V, E, M>> vertexMap =
+ partitionVertexMap.get(partitionId);
+ if (vertexMap == null) {
+ vertexMap = partitionVertexMap.putIfAbsent(partitionId, vertices);
+ if (vertexMap == null) {
+ return;
+ }
+ }
+ synchronized (vertexMap) {
+ vertexMap.addAll(vertices);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+}
+
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Wed May 9 18:47:32 2012
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Anything that the server stores
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class ServerData<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Map of partition ids to incoming vertices from other workers.
+ * (Synchronized on values)
+ */
+ private final ConcurrentHashMap<Integer, Collection<BasicVertex<I, V, E, M>>>
+ inPartitionVertexMap =
+ new ConcurrentHashMap<Integer, Collection<BasicVertex<I, V, E, M>>>();
+ /**
+ * Map of inbound messages, mapping from vertex index to list of messages.
+ * Transferred to inMessages at beginning of a superstep. This
+ * intermediary step exists so that the combiner will run not only at the
+ * client, but also at the server. Also, allows the sending of large
+ * message lists during the superstep computation. (Synchronized on values)
+ */
+ private final ConcurrentHashMap<I, Collection<M>> transientMessages =
+ new ConcurrentHashMap<I, Collection<M>>();
+ /**
+ * Map of partition ids to incoming vertex mutations from other workers.
+ * (Synchronized access to values)
+ */
+ private final ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
+ vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
+
+ /**
+ * Get the partition vertices (synchronize on the values)
+ *
+ * @return Partition vertices
+ */
+ public ConcurrentHashMap<Integer, Collection<BasicVertex<I, V, E, M>>>
+ getPartitionVertexMap() {
+ return inPartitionVertexMap;
+ }
+
+ /**
+ * Get the vertex messages (synchronize on the values)
+ *
+ * @return Vertex messages
+ */
+ public ConcurrentHashMap<I, Collection<M>> getTransientMessages() {
+ return transientMessages;
+ }
+
+ /**
+ * Get the vertex mutations (synchronize on the values)
+ *
+ * @return Vertex mutations
+ */
+ public ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
+ getVertexMutations() {
+ return vertexMutations;
+ }
+}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java (from r1332888, incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java&r1=1332888&r2=1336344&rev=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java Wed May 9 18:47:32 2012
@@ -27,8 +27,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
/**
* Public interface for workers to do message communication
@@ -39,9 +37,14 @@ import java.util.Map;
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public interface WorkerCommunications<I extends WritableComparable,
+public interface WorkerClient<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
/**
+ * Setup the client.
+ */
+ void setup();
+
+ /**
* Fix changes to the workers and the mapping between partitions and
* workers.
*/
@@ -100,11 +103,24 @@ public interface WorkerCommunications<I
void removeVertexReq(I vertexIndex) throws IOException;
/**
- * Get the vertices that were sent in the last iteration. After getting
- * the map, the user should synchronize with it to insure it
- * is thread-safe.
+ * Flush all outgoing messages. This will synchronously ensure that all
+ * messages have been send and delivered prior to returning.
*
- * @return map of vertex ranges to vertices
+ * @throws IOException
+ */
+ void flush() throws IOException;
+
+ /**
+ * Get the messages sent during this superstep and clear them.
+ *
+ * @return Number of messages sent before the reset.
+ */
+ long resetMessageCount();
+
+ /**
+ * Closes all connections.
+ *
+ * @throws IOException
*/
- Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
+ void closeConnections() throws IOException;
}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java (from r1332888, incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java&r1=1332888&r2=1336344&rev=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java Wed May 9 18:47:32 2012
@@ -18,55 +18,19 @@
package org.apache.giraph.comm;
-import java.io.Closeable;
-import java.io.IOException;
-
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
/**
- * Interface for message communication server.
+ * Interface for both the client and the server
*
* @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
+ * @param <V> Vertex data
+ * @param <E> Edge data
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public interface ServerInterface<I extends WritableComparable,
+public interface WorkerClientServer<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- extends Closeable, WorkerCommunications<I, V, E, M> {
- /**
- * Setup the server.
- */
- void setup();
-
- /**
- * Move the in transition messages to the in messages for every vertex and
- * add new connections to any newly appearing RPC proxies.
- */
- void prepareSuperstep();
-
- /**
- * Flush all outgoing messages. This will synchronously ensure that all
- * messages have been send and delivered prior to returning.
- *
- * @param context Context used to signal process
- * @return Number of messages sent during the last superstep
- * @throws IOException
- */
- long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException;
-
- /**
- * Closes all connections.
- *
- * @throws IOException
- */
- void closeConnections() throws IOException;
-
- /**
- * Shuts down.
- */
- void close();
+ extends WorkerClient<I, V, E, M>, WorkerServer<I, V, E, M> {
}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java (from r1332888, incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java&r1=1332888&r2=1336344&rev=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java Wed May 9 18:47:32 2012
@@ -19,11 +19,12 @@
package org.apache.giraph.comm;
import java.io.Closeable;
-import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.giraph.graph.BasicVertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
/**
* Interface for message communication server.
@@ -34,13 +35,15 @@ import org.apache.hadoop.mapreduce.Mappe
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public interface ServerInterface<I extends WritableComparable,
+public interface WorkerServer<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- extends Closeable, WorkerCommunications<I, V, E, M> {
+ extends Closeable {
/**
- * Setup the server.
+ * Get the port
+ *
+ * @return Port used by this server
*/
- void setup();
+ int getPort();
/**
* Move the in transition messages to the in messages for every vertex and
@@ -49,21 +52,13 @@ public interface ServerInterface<I exten
void prepareSuperstep();
/**
- * Flush all outgoing messages. This will synchronously ensure that all
- * messages have been send and delivered prior to returning.
- *
- * @param context Context used to signal process
- * @return Number of messages sent during the last superstep
- * @throws IOException
- */
- long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException;
-
- /**
- * Closes all connections.
+ * Get the vertices that were sent in the last iteration. After getting
+ * the map, the user should synchronize with it to insure it
+ * is thread-safe.
*
- * @throws IOException
+ * @return map of vertex ranges to vertices
*/
- void closeConnections() throws IOException;
+ Map<Integer, Collection<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
/**
* Shuts down.
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java Wed May 9 18:47:32 2012
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.comm.RequestRegistry.Type;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for requests to implement
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface WritableRequest<I extends WritableComparable,
+ V extends Writable, E extends Writable,
+ M extends Writable> extends Writable, Configurable {
+ /**
+ * Get the type of the request
+ *
+ * @return Request type
+ */
+ Type getType();
+ /**
+ * Execute the request
+ *
+ * @param serverData Accessible data that can be mutated per the request
+ */
+ void doRequest(ServerData<I, V, E, M> serverData);
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Wed May 9 18:47:32 2012
@@ -267,7 +267,7 @@ public abstract class BspService<I exten
/** File system */
private final FileSystem fs;
/** Checkpoint frequency */
- private int checkpointFrequency = -1;
+ private final int checkpointFrequency;
/** Map of aggregators */
private Map<String, Aggregator<Writable>> aggregatorMap =
new TreeMap<String, Aggregator<Writable>>();
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed May 9 18:47:32 2012
@@ -22,8 +22,10 @@ import net.iharder.Base64;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.NettyWorkerClientServer;
import org.apache.giraph.comm.RPCCommunications;
-import org.apache.giraph.comm.ServerInterface;
+import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.WorkerClientServer;
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionExchange;
import org.apache.giraph.graph.partition.PartitionOwner;
@@ -99,7 +101,7 @@ public class BspServiceWorker<I extends
private final Map<PartitionOwner, Partition<I, V, E, M>>
inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
/** Communication service */
- private final ServerInterface<I, V, E, M> commService;
+ private final WorkerClientServer<I, V, E, M> commService;
/** Structure to store the partitions on this worker */
private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
new HashMap<Integer, Partition<I, V, E, M>>();
@@ -148,11 +150,22 @@ public class BspServiceWorker<I extends
GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
workerGraphPartitioner =
getGraphPartitionerFactory().createWorkerGraphPartitioner();
- RPCCommunications<I, V, E, M> rpcCommService =
- new RPCCommunications<I, V, E, M>(context, this, graphState);
+ boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
+ GiraphJob.USE_NETTY_DEFAULT);
+ if (useNetty) {
+ commService = new NettyWorkerClientServer<I, V, E, M>(context, this);
+ } else {
+ commService =
+ new RPCCommunications<I, V, E, M>(context, this, graphState);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("BspServiceWorker: maxVerticesPerPartition = " +
+ maxVerticesPerPartition + " useNetty = " + useNetty);
+ }
+
workerInfo = new WorkerInfo(
- getHostname(), getTaskPartition(), rpcCommService.getPort());
- commService = rpcCommService;
+ getHostname(), getTaskPartition(), commService.getPort());
+
graphState.setWorkerCommunications(commService);
this.workerContext =
BspUtils.createWorkerContext(getConfiguration(),
@@ -311,6 +324,7 @@ public class BspServiceWorker<I extends
entry.getValue().getVertices().clear();
}
}
+ commService.flush();
inputSplitCache.clear();
return vertexEdgeCount;
@@ -954,7 +968,8 @@ public class BspServiceWorker<I extends
// 6. Wait for the master's global stats, and check if done
long workerSentMessages = 0;
try {
- workerSentMessages = commService.flush(getContext());
+ commService.flush();
+ workerSentMessages = commService.resetMessageCount();
} catch (IOException e) {
throw new IllegalStateException(
"finishSuperstep: flush failed", e);
@@ -1310,6 +1325,11 @@ public class BspServiceWorker<I extends
}
}
+ try {
+ getGraphMapper().getGraphState().getWorkerCommunications().flush();
+ } catch (IOException e) {
+ throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
+ }
String myPartitionExchangeDonePath =
getPartitionExchangeWorkerPath(
getApplicationAttempt(), getSuperstep(), getWorkerInfo());
@@ -1410,11 +1430,11 @@ public class BspServiceWorker<I extends
* temporarily stored.
*/
private void movePartitionsToWorker(
- ServerInterface<I, V, E, M> commService) {
- Map<Integer, List<BasicVertex<I, V, E, M>>> inPartitionVertexMap =
+ WorkerServer<I, V, E, M> commService) {
+ Map<Integer, Collection<BasicVertex<I, V, E, M>>> inPartitionVertexMap =
commService.getInPartitionVertexMap();
synchronized (inPartitionVertexMap) {
- for (Entry<Integer, List<BasicVertex<I, V, E, M>>> entry :
+ for (Entry<Integer, Collection<BasicVertex<I, V, E, M>>> entry :
inPartitionVertexMap.entrySet()) {
if (getPartitionMap().containsKey(entry.getKey())) {
throw new IllegalStateException(
@@ -1427,21 +1447,23 @@ public class BspServiceWorker<I extends
Partition<I, V, E, M> tmpPartition =
new Partition<I, V, E, M>(getConfiguration(),
entry.getKey());
- for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
- if (tmpPartition.putVertex(vertex) != null) {
- throw new IllegalStateException(
- "moveVerticesToWorker: Vertex " + vertex +
- " already exists!");
+ synchronized (entry.getValue()) {
+ for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
+ if (tmpPartition.putVertex(vertex) != null) {
+ throw new IllegalStateException(
+ "moveVerticesToWorker: Vertex " + vertex +
+ " already exists!");
+ }
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("movePartitionsToWorker: Adding " +
+ entry.getValue().size() +
+ " vertices for partition id " + entry.getKey());
+ }
+ getPartitionMap().put(tmpPartition.getPartitionId(),
+ tmpPartition);
+ entry.getValue().clear();
}
- if (LOG.isInfoEnabled()) {
- LOG.info("moveVerticesToWorker: Adding " +
- entry.getValue().size() +
- " vertices for partition id " + entry.getKey());
- }
- getPartitionMap().put(tmpPartition.getPartitionId(),
- tmpPartition);
- entry.getValue().clear();
}
inPartitionVertexMap.clear();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Wed May 9 18:47:32 2012
@@ -151,6 +151,11 @@ public class GiraphJob {
/** Local ZooKeeper directory to use */
public static final String ZOOKEEPER_DIR = "giraph.zkDir";
+ /** Use the RPC communication or netty communication */
+ public static final String USE_NETTY = "giraph.useNetty";
+ /** Default is to use RPC, not netty */
+ public static final boolean USE_NETTY_DEFAULT = false;
+
/** Initial port to start using for the RPC communication */
public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
/** Default port to start using for the RPC communication */
@@ -182,18 +187,24 @@ public class GiraphJob {
public static final String MAX_VERTICES_PER_PARTITION =
"giraph.maxVerticesPerPartition";
/** Default maximum number of vertices per partition before sending. */
- public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 100000;
+ public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 10000;
/** Maximum number of messages per peer before flush */
public static final String MSG_SIZE = "giraph.msgSize";
/** Default maximum number of messages per peer before flush */
- public static final int MSG_SIZE_DEFAULT = 1000;
+ public static final int MSG_SIZE_DEFAULT = 2000;
+
+ /** Maximum number of mutations per partition before flush */
+ public static final String MAX_MUTATIONS_PER_REQUEST =
+ "giraph.maxMutationsPerRequest";
+ /** Default maximum number of mutations per partition before flush */
+ public static final int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
/** Maximum number of messages that can be bulk sent during a flush */
public static final String MAX_MESSAGES_PER_FLUSH_PUT =
"giraph.maxMessagesPerFlushPut";
/** Default number of messages that can be bulk sent during a flush */
- public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000;
+ public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
/** Number of flush threads per peer */
public static final String MSG_NUM_FLUSH_THREADS =
@@ -273,8 +284,8 @@ public class GiraphJob {
public static final String CHECKPOINT_FREQUENCY =
"giraph.checkpointFrequency";
- /** Default checkpointing frequency of every 2 supersteps. */
- public static final int CHECKPOINT_FREQUENCY_DEFAULT = 2;
+ /** Default checkpointing frequency of none. */
+ public static final int CHECKPOINT_FREQUENCY_DEFAULT = 0;
/**
* Delete checkpoints after a successful job run?
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Wed May 9 18:47:32 2012
@@ -17,7 +17,7 @@
*/
package org.apache.giraph.graph;
-import org.apache.giraph.comm.WorkerCommunications;
+import org.apache.giraph.comm.WorkerClientServer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -45,7 +45,7 @@ E extends Writable, M extends Writable>
/** Graph-wide BSP Mapper for this Vertex */
private GraphMapper<I, V, E, M> graphMapper;
/** Graph-wide worker communications */
- private WorkerCommunications<I, V, E, M> workerCommunications;
+ private WorkerClientServer<I, V, E, M> workerCommunications;
public long getSuperstep() {
return superstep;
@@ -130,12 +130,12 @@ E extends Writable, M extends Writable>
* @return Returns this object.
*/
public GraphState<I, V, E, M> setWorkerCommunications(
- WorkerCommunications<I, V, E, M> workerCommunications) {
+ WorkerClientServer<I, V, E, M> workerCommunications) {
this.workerCommunications = workerCommunications;
return this;
}
- public WorkerCommunications<I, V, E, M> getWorkerCommunications() {
+ public WorkerClientServer<I, V, E, M> getWorkerCommunications() {
return workerCommunications;
}
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java Wed May 9 18:47:32 2012
@@ -18,9 +18,14 @@
package org.apache.giraph.graph;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.json.JSONException;
@@ -38,7 +43,8 @@ import org.json.JSONObject;
@SuppressWarnings("rawtypes")
public class VertexMutations<I extends WritableComparable,
V extends Writable, E extends Writable,
- M extends Writable> implements VertexChanges<I, V, E, M> {
+ M extends Writable> implements VertexChanges<I, V, E, M>,
+ Writable, Configurable {
/** List of added vertices during the last superstep */
private final List<BasicVertex<I, V, E, M>> addedVertexList =
new ArrayList<BasicVertex<I, V, E, M>>();
@@ -48,12 +54,76 @@ public class VertexMutations<I extends W
private final List<Edge<I, E>> addedEdgeList = new ArrayList<Edge<I, E>>();
/** List of removed edges */
private final List<I> removedEdgeList = new ArrayList<I>();
+ /** Configuration */
+ private Configuration conf;
+
+ /**
+ * Copy the vertex mutations.
+ *
+ * @return Copied vertex mutations
+ */
+ public VertexMutations<I, V, E, M> copy() {
+ VertexMutations<I, V, E, M> copied = new VertexMutations<I, V, E, M>();
+ copied.addedVertexList.addAll(this.addedVertexList);
+ copied.removedVertexCount = this.removedVertexCount;
+ copied.addedEdgeList.addAll(this.addedEdgeList);
+ copied.removedEdgeList.addAll(this.removedEdgeList);
+ copied.conf = this.conf;
+ return copied;
+ }
@Override
public List<BasicVertex<I, V, E, M>> getAddedVertexList() {
return addedVertexList;
}
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ addedVertexList.clear();
+ addedEdgeList.clear();
+ removedEdgeList.clear();
+
+ int addedVertexListSize = input.readInt();
+ for (int i = 0; i < addedVertexListSize; ++i) {
+ BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+ vertex.readFields(input);
+ addedVertexList.add(vertex);
+ }
+ removedVertexCount = input.readInt();
+ int addedEdgeListSize = input.readInt();
+ for (int i = 0; i < addedEdgeListSize; ++i) {
+ I destVertex = BspUtils.createVertexIndex(conf);
+ destVertex.readFields(input);
+ E edgeValue = BspUtils.createEdgeValue(conf);
+ edgeValue.readFields(input);
+ addedEdgeList.add(new Edge<I, E>(destVertex, edgeValue));
+ }
+ int removedEdgeListSize = input.readInt();
+ for (int i = 0; i < removedEdgeListSize; ++i) {
+ I removedEdge = BspUtils.createVertexIndex(conf);
+ removedEdge.readFields(input);
+ removedEdgeList.add(removedEdge);
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(addedVertexList.size());
+ for (BasicVertex<I, V, E, M> vertex : addedVertexList) {
+ vertex.write(output);
+ }
+ output.writeInt(removedVertexCount);
+ output.writeInt(addedEdgeList.size());
+ for (Edge<I, E> edge : addedEdgeList) {
+ edge.getDestVertexId().write(output);
+ edge.getEdgeValue().write(output);
+ }
+ output.writeInt(removedEdgeList.size());
+ for (I removedEdge : removedEdgeList) {
+ removedEdge.write(output);
+ }
+ }
+
/**
* Add a vertex mutation
*
@@ -103,6 +173,18 @@ public class VertexMutations<I extends W
removedEdgeList.add(destinationVertexId);
}
+ /**
+ * Add one vertex mutations to another
+ *
+ * @param vertexMutations Object to be added
+ */
+ public void addVertexMutations(VertexMutations<I, V, E, M> vertexMutations) {
+ addedVertexList.addAll(vertexMutations.getAddedVertexList());
+ removedVertexCount += vertexMutations.getRemovedVertexCount();
+ addedEdgeList.addAll(vertexMutations.getAddedEdgeList());
+ removedEdgeList.addAll(vertexMutations.getRemovedEdgeList());
+ }
+
@Override
public String toString() {
JSONObject jsonObject = new JSONObject();
@@ -117,4 +199,14 @@ public class VertexMutations<I extends W
e);
}
}
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Wed May 9 18:47:32 2012
@@ -21,6 +21,7 @@ package org.apache.giraph.graph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.net.InetSocketAddress;
import org.apache.hadoop.io.Writable;
@@ -69,6 +70,10 @@ public class WorkerInfo implements Writa
return hostnameId;
}
+ public InetSocketAddress getHostnamePort() {
+ return new InetSocketAddress(hostname, port);
+ }
+
public int getPort() {
return port;
}
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java Wed May 9 18:47:32 2012
@@ -33,57 +33,60 @@ import org.junit.Test;
* Unit test for automated checkpoint restarting
*/
public class TestAutoCheckpoint extends BspCase {
- /** Where the checkpoints will be stored and restarted */
- private final String HDFS_CHECKPOINT_DIR =
- "/tmp/testBspCheckpoints";
+ /** Where the checkpoints will be stored and restarted */
+ private final String HDFS_CHECKPOINT_DIR =
+ "/tmp/testBspCheckpoints";
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public TestAutoCheckpoint(String testName) {
- super(testName);
- }
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public TestAutoCheckpoint(String testName) {
+ super(testName);
+ }
-
- public TestAutoCheckpoint() {
- super(TestAutoCheckpoint.class.getName());
- }
- /**
- * Run a job that requires checkpointing and will have a worker crash
- * and still recover from a previous checkpoint.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testSingleFault()
- throws IOException, InterruptedException, ClassNotFoundException {
- if (getJobTracker() == null) {
- System.out.println(
- "testSingleFault: Ignore this test in local mode.");
- return;
- }
- GiraphJob job = new GiraphJob(getCallingMethodName());
- setupConfiguration(job);
- job.getConfiguration().setBoolean(SimpleCheckpointVertex.ENABLE_FAULT,
- true);
- job.getConfiguration().setInt("mapred.map.max.attempts", 4);
- job.getConfiguration().setInt(GiraphJob.POLL_MSECS, 5000);
- job.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY,
- HDFS_CHECKPOINT_DIR);
- job.getConfiguration().setBoolean(
- GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
- job.setVertexClass(SimpleCheckpointVertex.class);
- job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- job.setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
- Path outputPath = new Path("/tmp/" + getCallingMethodName());
- removeAndSetOutput(job, outputPath);
- assertTrue(job.run(true));
+ public TestAutoCheckpoint() {
+ super(TestAutoCheckpoint.class.getName());
+ }
+
+ /**
+ * Run a job that requires checkpointing and will have a worker crash
+ * and still recover from a previous checkpoint.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testSingleFault()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ if (getJobTracker() == null) {
+ System.out.println(
+ "testSingleFault: Ignore this test in local mode.");
+ return;
}
+ GiraphJob job = new GiraphJob(getCallingMethodName());
+ setupConfiguration(job);
+ job.getConfiguration().setBoolean(SimpleCheckpointVertex.ENABLE_FAULT,
+ true);
+ job.getConfiguration().setInt("mapred.map.max.attempts", 4);
+ // Trigger failure faster
+ job.getConfiguration().setInt("mapred.task.timeout", 30000);
+ job.getConfiguration().setInt(GiraphJob.POLL_MSECS, 5000);
+ job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2);
+ job.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY,
+ HDFS_CHECKPOINT_DIR);
+ job.getConfiguration().setBoolean(
+ GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+ job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ Path outputPath = new Path("/tmp/" + getCallingMethodName());
+ removeAndSetOutput(job, outputPath);
+ assertTrue(job.run(true));
+ }
}
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java Wed May 9 18:47:32 2012
@@ -44,7 +44,7 @@ public class TestJsonBase64Format extend
public TestJsonBase64Format(String testName) {
super(testName);
}
-
+
public TestJsonBase64Format() {
super(TestJsonBase64Format.class.getName());
}
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java Wed May 9 18:47:32 2012
@@ -45,7 +45,7 @@ public class TestManualCheckpoint extend
public TestManualCheckpoint(String testName) {
super(testName);
}
-
+
public TestManualCheckpoint() {
super(TestManualCheckpoint.class.getName());
}
@@ -65,6 +65,7 @@ public class TestManualCheckpoint extend
HDFS_CHECKPOINT_DIR);
job.getConfiguration().setBoolean(
GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+ job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2);
job.setVertexClass(SimpleCheckpointVertex.class);
job.setWorkerContextClass(
SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Wed May 9 18:47:32 2012
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import static org.mockito.Mockito.mock;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Test;
+
+/**
+ * Test the netty connections
+ */
+public class ConnectionTest {
+ /**
+ * Test connecting a single client to a single server.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void connectSingleClientServer() throws IOException {
+ @SuppressWarnings("rawtypes")
+ Context context = mock(Context.class);
+
+ Configuration conf = new Configuration();
+ ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+ NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
+ new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf, serverData);
+ server.start();
+
+ NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
+ new NettyClient<IntWritable, IntWritable, IntWritable,
+ IntWritable>(context);
+ client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+
+ client.stop();
+ server.stop();
+ }
+
+ /**
+ * Test connecting one client to three servers.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void connectOneClientToThreeServers() throws IOException {
+ @SuppressWarnings("rawtypes")
+ Context context = mock(Context.class);
+
+ Configuration conf = new Configuration();
+ ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+
+ NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server1 =
+ new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf, serverData);
+ server1.start();
+ NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server2 =
+ new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf, serverData);
+ server2.start();
+ NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server3 =
+ new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf, serverData);
+ server3.start();
+
+ NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
+ new NettyClient<IntWritable, IntWritable, IntWritable,
+ IntWritable>(context);
+ List<InetSocketAddress> serverAddresses =
+ new ArrayList<InetSocketAddress>();
+ client.connectAllAdddresses(serverAddresses);
+
+ client.stop();
+ server1.stop();
+ server2.stop();
+ server3.stop();
+ }
+
+ /**
+ * Test connecting three clients to one server.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void connectThreeClientsToOneServer() throws IOException {
+ @SuppressWarnings("rawtypes")
+ Context context = mock(Context.class);
+
+ Configuration conf = new Configuration();
+ ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+ NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
+ new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf, serverData);
+ server.start();
+
+ NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client1 =
+ new NettyClient<IntWritable, IntWritable, IntWritable,
+ IntWritable>(context);
+ client1.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+ NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client2 =
+ new NettyClient<IntWritable, IntWritable, IntWritable,
+ IntWritable>(context);
+ client2.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+ NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client3 =
+ new NettyClient<IntWritable, IntWritable, IntWritable,
+ IntWritable>(context);
+ client3.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+
+ client1.stop();
+ client2.stop();
+ client3.stop();
+ server.stop();
+ }
+}
Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Wed May 9 18:47:32 2012
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Test all the different netty requests.
+ */
+public class RequestTest {
+ /** Configuration */
+ private Configuration conf;
+ /** Server data */
+ private ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+ serverData;
+ /** Server */
+ private NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>
+ server;
+ /** Client */
+ private NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
+ client;
+
+ /**
+ * Only for testing.
+ */
+ public static class TestVertex extends EdgeListVertex<IntWritable,
+ IntWritable, IntWritable, IntWritable> {
+ @Override
+ public void compute(Iterator<IntWritable> msgIterator) throws IOException {
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ @SuppressWarnings("rawtypes")
+ Context context = mock(Context.class);
+
+ // Setup the conf
+ conf = new Configuration();
+ conf.setClass(GiraphJob.VERTEX_CLASS, TestVertex.class, BasicVertex.class);
+ conf.setClass(GiraphJob.VERTEX_INDEX_CLASS,
+ IntWritable.class, WritableComparable.class);
+ conf.setClass(GiraphJob.VERTEX_VALUE_CLASS,
+ IntWritable.class, Writable.class);
+ conf.setClass(GiraphJob.EDGE_VALUE_CLASS,
+ IntWritable.class, Writable.class);
+ conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
+ IntWritable.class, Writable.class);
+
+ // Start the service
+ serverData =
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+ server =
+ new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf, serverData);
+ server.start();
+ client =
+ new NettyClient<IntWritable, IntWritable, IntWritable,
+ IntWritable>(context);
+ client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+ }
+
+ @Test
+ public void sendVertexPartition() throws IOException {
+ // Data to send
+ int partitionId = 13;
+ Collection<BasicVertex<IntWritable, IntWritable, IntWritable,
+ IntWritable>> vertices =
+ new ArrayList<BasicVertex<IntWritable, IntWritable,
+ IntWritable, IntWritable>>();
+ for (int i = 0; i < 10; ++i) {
+ TestVertex vertex = new TestVertex();
+ vertex.initialize(new IntWritable(i), new IntWritable(i), null, null);
+ vertices.add(vertex);
+ }
+
+ // Send the request
+ SendVertexRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> request =
+ new SendVertexRequest<IntWritable, IntWritable,
+ IntWritable, IntWritable>(partitionId, vertices);
+ client.sendWritableRequest(server.getMyAddress(), request);
+ client.waitAllRequests();
+
+ // Stop the service
+ client.stop();
+ server.stop();
+
+ // Check the output
+ Map<Integer, Collection<BasicVertex<IntWritable, IntWritable,
+ IntWritable, IntWritable>>> partitionVertexMap =
+ serverData.getPartitionVertexMap();
+ synchronized (partitionVertexMap) {
+ assertTrue(partitionVertexMap.containsKey(partitionId));
+ int total = 0;
+ for (BasicVertex<IntWritable, IntWritable,
+ IntWritable, IntWritable> vertex :
+ (partitionVertexMap.get(partitionId))) {
+ total += vertex.getVertexId().get();
+ }
+ assertEquals(total, 45);
+ }
+ }
+
+ @Test
+ public void sendPartitionMessagesRequest() throws IOException {
+ // Data to send
+ int partitionId = 17;
+ Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
+ Maps.newHashMap();
+ for (int i = 1; i < 7; ++i) {
+ IntWritable vertexId = new IntWritable(i);
+ Collection<IntWritable> messages = Lists.newArrayList();
+ for (int j = 0; j < i; ++j) {
+ messages.add(new IntWritable(j));
+ }
+ vertexIdMessages.put(vertexId, messages);
+ }
+
+ // Send the request
+ SendPartitionMessagesRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> request =
+ new SendPartitionMessagesRequest<IntWritable, IntWritable,
+ IntWritable, IntWritable>(partitionId, vertexIdMessages);
+ client.sendWritableRequest(server.getMyAddress(), request);
+ client.waitAllRequests();
+
+ // Stop the service
+ client.stop();
+ server.stop();
+
+ // Check the output
+ ConcurrentHashMap<IntWritable, Collection<IntWritable>> inVertexIdMessages =
+ serverData.getTransientMessages();
+ int keySum = 0;
+ int messageSum = 0;
+ for (Entry<IntWritable, Collection<IntWritable>> entry :
+ inVertexIdMessages.entrySet()) {
+ keySum += entry.getKey().get();
+ synchronized (entry.getValue()) {
+ for (IntWritable message : entry.getValue()) {
+ messageSum += message.get();
+ }
+ }
+ }
+ assertEquals(21, keySum);
+ assertEquals(35, messageSum);
+ }
+
+ @Test
+ public void sendPartitionMutationsRequest() throws IOException {
+ // Data to send
+ int partitionId = 19;
+ Map<IntWritable, VertexMutations<IntWritable, IntWritable,
+ IntWritable, IntWritable>> vertexIdMutations =
+ Maps.newHashMap();
+ for (int i = 0; i < 11; ++i) {
+ VertexMutations<IntWritable, IntWritable, IntWritable, IntWritable>
+ mutations =
+ new VertexMutations<IntWritable, IntWritable,
+ IntWritable, IntWritable>();
+ for (int j = 0; j < 3; ++j) {
+ TestVertex vertex = new TestVertex();
+ vertex.initialize(new IntWritable(i), new IntWritable(j), null, null);
+ mutations.addVertex(vertex);
+ }
+ for (int j = 0; j < 2; ++j) {
+ mutations.removeVertex();
+ }
+ for (int j = 0; j < 5; ++j) {
+ Edge<IntWritable, IntWritable> edge =
+ new Edge<IntWritable, IntWritable>(
+ new IntWritable(i), new IntWritable(2*j));
+ mutations.addEdge(edge);
+ }
+ for (int j = 0; j < 7; ++j) {
+ mutations.removeEdge(new IntWritable(j));
+ }
+ vertexIdMutations.put(new IntWritable(i), mutations);
+ }
+
+ // Send the request
+ SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable,
+ IntWritable> request =
+ new SendPartitionMutationsRequest<IntWritable, IntWritable,
+ IntWritable, IntWritable>(partitionId, vertexIdMutations);
+ client.sendWritableRequest(server.getMyAddress(), request);
+ client.waitAllRequests();
+
+ // Stop the service
+ client.stop();
+ server.stop();
+
+ // Check the output
+ ConcurrentHashMap<IntWritable, VertexMutations<IntWritable, IntWritable,
+ IntWritable, IntWritable>> inVertexIdMutations =
+ serverData.getVertexMutations();
+ int keySum = 0;
+ for (Entry<IntWritable, VertexMutations<IntWritable, IntWritable,
+ IntWritable, IntWritable>> entry :
+ inVertexIdMutations.entrySet()) {
+ synchronized (entry.getValue()) {
+ keySum += entry.getKey().get();
+ int vertexValueSum = 0;
+ for (BasicVertex<IntWritable, IntWritable, IntWritable, IntWritable>
+ vertex : entry.getValue().getAddedVertexList()) {
+ vertexValueSum += vertex.getVertexValue().get();
+ }
+ assertEquals(3, vertexValueSum);
+ assertEquals(2, entry.getValue().getRemovedVertexCount());
+ int removeEdgeValueSum = 0;
+ for (Edge<IntWritable, IntWritable> edge :
+ entry.getValue().getAddedEdgeList()) {
+ removeEdgeValueSum += edge.getEdgeValue().get();
+ }
+ assertEquals(20, removeEdgeValueSum);
+ }
+ }
+ assertEquals(55, keySum);
+ }
+}
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java Wed May 9 18:47:32 2012
@@ -113,7 +113,8 @@ public class SimpleShortestPathVertexTes
"[1,0,[[2,1],[3,3]]]",
"[2,0,[[3,1],[4,10]]]",
"[3,0,[[4,2]]]",
- "[4,0,[]]" };
+ "[4,0,[]]"
+ };
// start from vertex 1
Map<String, String> params = Maps.newHashMap();
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java Wed May 9 18:47:32 2012
@@ -18,7 +18,7 @@
package org.apache.giraph.utils;
-import org.apache.giraph.comm.WorkerCommunications;
+import org.apache.giraph.comm.WorkerClientServer;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.BasicVertex;
import org.apache.hadoop.conf.Configuration;
@@ -47,13 +47,13 @@ public class MockUtils {
private final GraphState<I, V, E, M> graphState;
private final Mapper.Context context;
private final Configuration conf;
- private final WorkerCommunications communications;
+ private final WorkerClientServer communications;
public MockedEnvironment() {
graphState = Mockito.mock(GraphState.class);
context = Mockito.mock(Mapper.Context.class);
conf = Mockito.mock(Configuration.class);
- communications = Mockito.mock(WorkerCommunications.class);
+ communications = Mockito.mock(WorkerClientServer.class);
}
/** the injected graph state */
@@ -72,7 +72,7 @@ public class MockUtils {
}
/** the injected worker communications */
- public WorkerCommunications getCommunications() {
+ public WorkerClientServer getCommunications() {
return communications;
}