You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/17 06:33:18 UTC
svn commit: r1399090 [1/3] - in /giraph/trunk: ./
giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/
giraph/src/main/java/org/apache/giraph/
giraph/src/main/java/org/apache/giraph/bsp/
giraph/src/main/java/org/apache/giraph/comm/ giraph/s...
Author: aching
Date: Wed Oct 17 04:33:16 2012
New Revision: 1399090
URL: http://svn.apache.org/viewvc?rev=1399090&view=rev
Log:
GIRAPH-374: Multithreading in input split loading and compute
(aching).
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
- copied, changed from r1399043, giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
- copied, changed from r1399043, giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java
Removed:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct 17 04:33:16 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-374: Multithreading in input split loading and compute (aching).
+
GIRAPH-375: Cleaner MutableVertex API (apresta)
GIRAPH-371: Replace BspUtils in giraph-formats-contrib for
@@ -8,7 +10,8 @@ Release 0.2.0 - unreleased
GIRAPH-369: bin/giraph broken (Nitay Joffe via ereisman)
- GIRAPH-368: HBase Vertex I/O formats handle setConf() internally (bfem via ereisman)
+ GIRAPH-368: HBase Vertex I/O formats handle setConf() internally
+ (bfem via ereisman)
GIRAPH-367: Expose WorkerInfo to clients (Nitay Joffe via ereisman)
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java Wed Oct 17 04:33:16 2012
@@ -48,19 +48,18 @@ import java.util.List;
*
* Works with {@link HBaseVertexOutputFormat}
*
- *
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public abstract class HBaseVertexInputFormat<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends VertexInputFormat<I, V, E, M> {
+public abstract class HBaseVertexInputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
/**
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Wed Oct 17 04:33:16 2012
@@ -105,6 +105,11 @@ public class GiraphConfiguration extends
/** Default log level is INFO (same as Hadoop) */
public static final String LOG_LEVEL_DEFAULT = "info";
+ /** Use thread level debugging? */
+ public static final String LOG_THREAD_LAYOUT = "giraph.logThreadLayout";
+ /** Default to not use thread-level debugging */
+ public static final boolean LOG_THREAD_LAYOUT_DEFAULT = false;
+
/**
* Minimum percent of the maximum number of workers that have responded
* in order to continue progressing. (float)
@@ -343,6 +348,17 @@ public class GiraphConfiguration extends
public static final String MSG_NUM_FLUSH_THREADS =
"giraph.msgNumFlushThreads";
+ /** Number of threads for vertex computation */
+ public static final String NUM_COMPUTE_THREADS = "giraph.numComputeThreads";
+ /** Default number of threads for vertex computation */
+ public static final int NUM_COMPUTE_THREADS_DEFAULT = 1;
+
+ /** Number of threads for input splits loading */
+ public static final String NUM_INPUT_SPLITS_THREADS =
+ "giraph.numInputSplitsThreads";
+ /** Default number of threads for input splits loading */
+ public static final int NUM_INPUT_SPLITS_THREADS_DEFAULT = 1;
+
/** Number of poll attempts prior to failing the job (int) */
public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
/** Default poll attempts */
@@ -748,6 +764,15 @@ public class GiraphConfiguration extends
return get(LOG_LEVEL, LOG_LEVEL_DEFAULT);
}
+ /**
+ * Use the log thread layout option?
+ *
+ * @return True if use the log thread layout option, false otherwise
+ */
+ public boolean useLogThreadLayout() {
+ return getBoolean(LOG_THREAD_LAYOUT, LOG_THREAD_LAYOUT_DEFAULT);
+ }
+
public boolean getLocalTestMode() {
return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT);
}
@@ -844,4 +869,34 @@ public class GiraphConfiguration extends
public boolean authenticate() {
return getBoolean(AUTHENTICATE, DEFAULT_AUTHENTICATE);
}
+
+ /**
+ * Set the number of compute threads
+ *
+ * @param numComputeThreads Number of compute threads to use
+ */
+ public void setNumComputeThreads(int numComputeThreads) {
+ setInt(NUM_COMPUTE_THREADS, numComputeThreads);
+ }
+
+ public int getNumComputeThreads() {
+ return getInt(NUM_COMPUTE_THREADS, NUM_COMPUTE_THREADS_DEFAULT);
+ }
+
+ /**
+ * Set the number of input split threads
+ *
+ * @param numInputSplitsThreads Number of input split threads to use
+ */
+ public void setNumInputSplitsThreads(int numInputSplitsThreads) {
+ setInt(NUM_INPUT_SPLITS_THREADS, numInputSplitsThreads);
+ }
+
+ public int getNumInputSplitsThreads() {
+ return getInt(NUM_INPUT_SPLITS_THREADS, NUM_INPUT_SPLITS_THREADS_DEFAULT);
+ }
+
+ public long getInputSplitMaxVertices() {
+ return getLong(INPUT_SPLIT_MAX_VERTICES, INPUT_SPLIT_MAX_VERTICES_DEFAULT);
+ }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java Wed Oct 17 04:33:16 2012
@@ -35,10 +35,7 @@ import java.io.IOException;
@SuppressWarnings("rawtypes")
public interface CentralizedService<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Setup (must be called prior to any other function)
- */
- void setup();
+
/**
* Get the current global superstep of the application to work on.
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Wed Oct 17 04:33:16 2012
@@ -39,6 +39,11 @@ public interface CentralizedServiceMaste
V extends Writable, E extends Writable, M extends Writable> extends
CentralizedService<I, V, E, M> {
/**
+ * Setup (must be called prior to any other function)
+ */
+ void setup();
+
+ /**
* Become the master.
* @return true if became the master, false if the application is done.
*/
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Wed Oct 17 04:33:16 2012
@@ -23,6 +23,10 @@ import java.util.Collection;
import java.util.List;
import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.graph.WorkerAggregatorUsage;
import org.apache.giraph.graph.partition.PartitionStore;
import org.apache.hadoop.io.Writable;
@@ -50,6 +54,13 @@ public interface CentralizedServiceWorke
V extends Writable, E extends Writable, M extends Writable>
extends CentralizedService<I, V, E, M> {
/**
+ * Setup (must be called prior to any other function)
+ *
+ * @return Finished superstep stats for the input superstep
+ */
+ FinishedSuperstepStats setup();
+
+ /**
* Get the worker information
*
* @return Worker information
@@ -57,6 +68,15 @@ public interface CentralizedServiceWorke
WorkerInfo getWorkerInfo();
/**
+ * Get the worker client (for instantiating WorkerClientRequestProcessor
+ * instances.
+ *
+ * @return Worker client
+ */
+ WorkerClient<I, V, E, M> getWorkerClient();
+
+ /**
+ * Get the worker context.
*
* @return worker's WorkerContext
*/
@@ -92,27 +112,33 @@ public interface CentralizedServiceWorke
* appropriate superstep.
*
* @param superstep which checkpoint to use
+ * @return Graph-wide vertex and edge counts
* @throws IOException
*/
- void loadCheckpoint(long superstep) throws IOException;
+ VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
/**
* Take all steps prior to actually beginning the computation of a
* superstep.
*
+ * @param graphState Current graph state
* @return Collection of all the partition owners from the master for this
* superstep.
*/
- Collection<? extends PartitionOwner> startSuperstep();
+ Collection<? extends PartitionOwner> startSuperstep(
+ GraphState<I, V, E, M> graphState);
/**
* Worker is done with its portion of the superstep. Report the
* worker level statistics after the computation.
*
+ * @param graphState Current graph state
* @param partitionStatsList All the partition stats for this worker
- * @return true if this is the last superstep, false otherwise
+ * @return Stats of the superstep completion
*/
- boolean finishSuperstep(List<PartitionStats> partitionStatsList);
+ FinishedSuperstepStats finishSuperstep(
+ GraphState<I, V, E, M> graphState,
+ List<PartitionStats> partitionStatsList);
/**
* Get the partition that a vertex id would belong to.
@@ -158,7 +184,7 @@ public interface CentralizedServiceWorke
/**
* If desired by the user, vertex partitions are redistributed among
- * workers according to the chosen {@link WorkerGraphPartitioner}.
+ * workers according to the chosen WorkerGraphPartitioner.
*
* @param masterSetPartitionOwners Partition owner info passed from the
* master.
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java Wed Oct 17 04:33:16 2012
@@ -34,7 +34,7 @@ import com.google.common.collect.Lists;
/**
* Aggregates the messages to be send to workers so they can be sent
- * in bulk.
+ * in bulk. Not thread-safe.
*
* @param <I> Vertex id
* @param <M> Message data
@@ -98,7 +98,7 @@ public class SendMessageCache<I extends
// Add the message
final int originalMessageCount = messages.size();
messages.add(message);
- if (combiner != null) {
+ if (combiner != null && originalMessageCount > 0) {
try {
messages = Lists.newArrayList(combiner.combine(destVertexId, messages));
} catch (IOException e) {
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java?rev=1399090&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java Wed Oct 17 04:33:16 2012
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GiraphTransferRegulator;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+/**
+ * Caches partition vertices prior to sending. Aggregating these requests
+ * will make larger, more efficient requests. Not thread-safe.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class SendPartitionCache<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SendPartitionCache.class);
+ /** Input split vertex cache (only used when loading from input split) */
+ private final Map<PartitionOwner, Partition<I, V, E, M>>
+ ownerPartitionMap = Maps.newHashMap();
+ /** Number of messages in each partition */
+ private final Map<PartitionOwner, Integer> messageCountMap =
+ Maps.newHashMap();
+ /** Context */
+ private final Mapper<?, ?, ?, ?>.Context context;
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ /**
+ * Regulates the size of outgoing Collections of vertices read
+ * by the local worker during INPUT_SUPERSTEP that are to be
+ * transfered from <code>inputSplitCache</code> to the owner
+ * of their initial, master-assigned Partition.*
+ */
+ private final GiraphTransferRegulator transferRegulator;
+
+ /**
+ * Constructor.
+ *
+ * @param context Context
+ * @param configuration Configuration
+ */
+ public SendPartitionCache(
+ Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+ this.context = context;
+ this.configuration = configuration;
+ transferRegulator =
+ new GiraphTransferRegulator(configuration);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("SendPartitionCache: maxVerticesPerTransfer = " +
+ transferRegulator.getMaxVerticesPerTransfer());
+ LOG.info("SendPartitionCache: maxEdgesPerTransfer = " +
+ transferRegulator.getMaxEdgesPerTransfer());
+ }
+ }
+
+ /**
+ * Add a vertex to the cache, returning the partition if full
+ *
+ * @param partitionOwner Partition owner of the vertex
+ * @param vertex Vertex to add
+ * @return A partition to send or null, if requirements are not met
+ */
+ public Partition<I, V, E, M> addVertex(PartitionOwner partitionOwner,
+ Vertex<I, V, E, M> vertex) {
+ Partition<I, V, E, M> partition =
+ ownerPartitionMap.get(partitionOwner);
+ if (partition == null) {
+ partition = new Partition<I, V, E, M>(
+ configuration,
+ partitionOwner.getPartitionId(),
+ context);
+ ownerPartitionMap.put(partitionOwner, partition);
+ }
+ transferRegulator.incrementCounters(partitionOwner, vertex);
+
+ Vertex<I, V, E, M> oldVertex =
+ partition.putVertex(vertex);
+ if (oldVertex != null) {
+ LOG.warn("addVertex: Replacing vertex " + oldVertex +
+ " with " + vertex);
+ }
+
+ // Requirements met to transfer?
+ if (transferRegulator.transferThisPartition(partitionOwner)) {
+ return ownerPartitionMap.remove(partitionOwner);
+ }
+
+ return null;
+ }
+
+ /**
+ * Get the owner partition map (for flushing)
+ *
+ * @return Owner partition map
+ */
+ public Map<PartitionOwner, Partition<I, V, E, M>> getOwnerPartitionMap() {
+ return ownerPartitionMap;
+ }
+
+ /**
+ * Clear the cache.
+ */
+ public void clear() {
+ ownerPartitionMap.clear();
+ }
+}
+
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java Wed Oct 17 04:33:16 2012
@@ -18,11 +18,11 @@
package org.apache.giraph.comm;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.giraph.graph.partition.Partition;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -30,7 +30,8 @@ import org.apache.hadoop.io.WritableComp
import java.io.IOException;
/**
- * Public interface for workers to do message communication
+ * Public interface for workers to establish connections and send aggregated
+ * requests.
*
* @param <I> Vertex id
* @param <V> Vertex value
@@ -57,84 +58,47 @@ else[HADOOP_NON_SECURE]*/
/*end[HADOOP_NON_SECURE]*/
/**
- * Fix changes to the workers and the mapping between partitions and
- * workers.
- */
- void fixPartitionIdToSocketAddrMap();
-
- /**
* Lookup PartitionOwner for a vertex.
+ *
* @param vertexId id to look up.
* @return PartitionOwner holding the vertex.
*/
PartitionOwner getVertexPartitionOwner(I vertexId);
/**
- * Sends a message to destination vertex.
- *
- * @param destVertexId Destination vertex id.
- * @param message Message to send.
- */
- void sendMessageRequest(I destVertexId, M message);
-
- /**
- * Sends a partition to the appropriate partition owner
- *
- * @param workerInfo Owner the vertices belong to
- * @param partition Partition to send
- */
- void sendPartitionRequest(WorkerInfo workerInfo,
- Partition<I, V, E, M> partition);
-
- /**
- * Sends a request to the appropriate vertex range owner to add an edge
+ * Make sure that all the connections to the partitions owners have been
+ * established.
*
- * @param vertexIndex Index of the vertex to get the request
- * @param edge Edge to be added
- * @throws IOException
- */
- void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException;
-
- /**
- * Sends a request to the appropriate vertex range owner to remove an edge
- *
- * @param vertexIndex Index of the vertex to get the request
- * @param destinationVertexIndex Index of the edge to be removed
- * @throws IOException
- */
- void removeEdgeRequest(I vertexIndex, I destinationVertexIndex)
- throws IOException;
-
- /**
- * Sends a request to the appropriate vertex range owner to add a vertex
- *
- * @param vertex Vertex to be added
- * @throws IOException
+ * @param partitionOwners Partition owners to establish/check connections
*/
- void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException;
+ void openConnections(
+ Collection<? extends PartitionOwner> partitionOwners);
/**
- * Sends a request to the appropriate vertex range owner to remove a vertex
+ * Fill the socket address cache for the worker info and its partition.
*
- * @param vertexIndex Index of the vertex to be removed
- * @throws IOException
+ * @param workerInfo Worker information to get the socket address
+ * @param partitionId Partition id to look up.
+ * @return address of the vertex range server containing this vertex
*/
- void removeVertexRequest(I vertexIndex) throws IOException;
+ InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
+ int partitionId);
/**
- * Flush all outgoing messages. This will synchronously ensure that all
- * messages have been send and delivered prior to returning.
+ * Send a request to a remote server (should be already connected)
*
- * @throws IOException
+ * @param destWorkerId Destination worker id
+ * @param remoteServer Server to send the request to
+ * @param request Request to send
*/
- void flush() throws IOException;
+ void sendWritableRequest(Integer destWorkerId,
+ InetSocketAddress remoteServer,
+ WritableRequest request);
/**
- * Get the messages sent during this superstep and clear them.
- *
- * @return Number of messages sent before the reset.
+ * Wait until all the outstanding requests are completed.
*/
- long resetMessageCount();
+ void waitAllRequests();
/**
* Closes all connections.
Copied: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java (from r1399043, giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java?p2=giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java&p1=giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java&r1=1399043&r2=1399090&rev=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java Wed Oct 17 04:33:16 2012
@@ -15,60 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.giraph.comm;
-import org.apache.giraph.graph.Vertex;
+import java.io.IOException;
import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.Partition;
-
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import java.io.IOException;
-
/**
- * Public interface for workers to do message communication
+ * Aggregates IPC requests and sends them off
*
- * @param <I> Vertex id
+ * @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
* @param <M> Message data
*/
-@SuppressWarnings("rawtypes")
-public interface WorkerClient<I extends WritableComparable,
+public interface WorkerClientRequestProcessor<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
-
- /**
- * Setup the client.
- */
-/*if[HADOOP_NON_SECURE]
- void setup();
-else[HADOOP_NON_SECURE]*/
- /**
- * Setup the client.
- *
- * @param authenticate whether to SASL authenticate with server or not:
- * set by giraph.authenticate configuration option.
- */
- void setup(boolean authenticate);
-/*end[HADOOP_NON_SECURE]*/
-
- /**
- * Fix changes to the workers and the mapping between partitions and
- * workers.
- */
- void fixPartitionIdToSocketAddrMap();
-
- /**
- * Lookup PartitionOwner for a vertex.
- * @param vertexId id to look up.
- * @return PartitionOwner holding the vertex.
- */
- PartitionOwner getVertexPartitionOwner(I vertexId);
-
/**
* Sends a message to destination vertex.
*
@@ -78,9 +45,18 @@ else[HADOOP_NON_SECURE]*/
void sendMessageRequest(I destVertexId, M message);
/**
- * Sends a partition to the appropriate partition owner
+ * Sends a vertex to the appropriate partition owner
+ *
+ * @param partitionOwner Owner of the vertex
+ * @param vertex Vertex to send
+ */
+ void sendVertexRequest(PartitionOwner partitionOwner,
+ Vertex<I, V, E, M> vertex);
+
+ /**
+ * Send a partition request (no batching).
*
- * @param workerInfo Owner the vertices belong to
+ * @param workerInfo Worker to send the partition to
* @param partition Partition to send
*/
void sendPartitionRequest(WorkerInfo workerInfo,
@@ -91,7 +67,7 @@ else[HADOOP_NON_SECURE]*/
*
* @param vertexIndex Index of the vertex to get the request
* @param edge Edge to be added
- * @throws IOException
+ * @throws java.io.IOException
*/
void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException;
@@ -122,8 +98,8 @@ else[HADOOP_NON_SECURE]*/
void removeVertexRequest(I vertexIndex) throws IOException;
/**
- * Flush all outgoing messages. This will synchronously ensure that all
- * messages have been send and delivered prior to returning.
+ * Flush all outgoing messages. This ensures that all the messages have bee
+ * sent, but not guaranteed to have been delivered yet.
*
* @throws IOException
*/
@@ -137,19 +113,10 @@ else[HADOOP_NON_SECURE]*/
long resetMessageCount();
/**
- * Closes all connections.
- *
- * @throws IOException
- */
- void closeConnections() throws IOException;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
- /**
- * Authenticates, as client, with another BSP worker, as server.
+ * Lookup PartitionOwner for a vertex.
*
- * @throws IOException
+ * @param vertexId id to look up.
+ * @return PartitionOwner holding the vertex.
*/
- void authenticate() throws IOException;
-/*end[HADOOP_NON_SECURE]*/
+ PartitionOwner getVertexPartitionOwner(I vertexId);
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java Wed Oct 17 04:33:16 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.comm;
+import org.apache.giraph.graph.GraphState;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -45,8 +46,10 @@ public interface WorkerServer<I extends
/**
* Move the in transition messages to the in messages for every vertex and
* add new connections to any newly appearing IPC proxies.
+ *
+ * @param graphState Current graph state
*/
- void prepareSuperstep();
+ void prepareSuperstep(GraphState<I, V, E, M> graphState);
/**
* Get server data
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java Wed Oct 17 04:33:16 2012
@@ -18,13 +18,14 @@
package org.apache.giraph.comm.netty;
-import java.util.Collection;
import java.util.List;
import com.google.common.collect.Lists;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
/**
- * Maintains multiple channels and rotates between them
+ * Maintains multiple channels and rotates between them. This is thread-safe.
*/
public class ChannelRotater {
/** Index of last used channel */
@@ -52,8 +53,10 @@ public class ChannelRotater {
*
* @param channel Channel to add
*/
- public void addChannel(Channel channel) {
- channelList.add(channel);
+ public synchronized void addChannel(Channel channel) {
+ synchronized (channelList) {
+ channelList.add(channel);
+ }
}
/**
@@ -61,42 +64,62 @@ public class ChannelRotater {
*
* @return Next channel
*/
- public Channel nextChannel() {
+ public synchronized Channel nextChannel() {
if (channelList.isEmpty()) {
throw new IllegalArgumentException("nextChannel: No channels exist!");
}
- incrementIndex();
+ ++index;
+ if (index >= channelList.size()) {
+ index = 0;
+ }
return channelList.get(index);
}
/**
- * Remove the last channel that was given out
+ * Remove the a channel
*
- * @return Return the removed channel
+ * @param channel Channel to remove
+ * @return Return true if successful, false otherwise
*/
- public Channel removeLast() {
- Channel channel = channelList.remove(index);
- incrementIndex();
- return channel;
+ public synchronized boolean removeChannel(Channel channel) {
+ boolean success = channelList.remove(channel);
+ if (index >= channelList.size()) {
+ index = 0;
+ }
+ return success;
}
/**
- * Increment the channel index with wrapping
+ * Get the number of channels in this object
+ *
+ * @return Number of channels
*/
- private void incrementIndex() {
- ++index;
- if (index >= channelList.size()) {
- index = 0;
+ public synchronized int size() {
+ return channelList.size();
+ }
+
+ /**
+ * Close the channels
+ *
+ * @param channelFutureListener If desired, pass in a channel future listener
+ */
+ public synchronized void closeChannels(
+ ChannelFutureListener channelFutureListener) {
+ for (Channel channel : channelList) {
+ ChannelFuture channelFuture = channel.close();
+ if (channelFutureListener != null) {
+ channelFuture.addListener(channelFutureListener);
+ }
}
}
/**
- * Get the channels
+ * Get a copy of the channels
*
- * @return Collection of the channels
+ * @return Copy of the channels
*/
- Collection<Channel> getChannels() {
- return channelList;
+ public synchronized Iterable<Channel> getChannels() {
+ return Lists.newArrayList(channelList);
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Wed Oct 17 04:33:16 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.comm.netty;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
-import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
@@ -30,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.giraph.GiraphConfiguration;
@@ -52,7 +52,6 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelLocal;
@@ -67,7 +66,7 @@ import org.jboss.netty.handler.execution
import static org.jboss.netty.channel.Channels.pipeline;
/**
- * Netty client for sending requests.
+ * Netty client for sending requests. Thread-safe.
*/
public class NettyClient {
/** Do we have a limit on number of open requests we can have */
@@ -100,8 +99,8 @@ else[HADOOP_NON_SECURE]*/
* Map of the peer connections, mapping from remote socket address to client
* meta data
*/
- private final Map<InetSocketAddress, ChannelRotater> addressChannelMap =
- Maps.newHashMap();
+ private final ConcurrentMap<InetSocketAddress, ChannelRotater>
+ addressChannelMap = new MapMaker().makeMap();
/**
* Request map of client request ids to request information.
*/
@@ -214,7 +213,9 @@ else[HADOOP_NON_SECURE]*/
GiraphConfiguration.NETTY_CLIENT_EXECUTION_THREADS_DEFAULT);
executionHandler = new ExecutionHandler(
new MemoryAwareThreadPoolExecutor(
- executionThreads, 1048576, 1048576));
+ executionThreads, 1048576, 1048576, 1, TimeUnit.HOURS,
+ new ThreadFactoryBuilder().setNameFormat("netty-client-exec-%d")
+ .build()));
if (LOG.isInfoEnabled()) {
LOG.info("NettyClient: Using execution handler with " +
executionThreads + " threads after " +
@@ -226,10 +227,10 @@ else[HADOOP_NON_SECURE]*/
bossExecutorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(
- "Giraph Client Netty Boss #%d").build());
+ "netty-client-boss-%d").build());
workerExecutorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(
- "Giraph Client Netty Worker #%d").build());
+ "netty-client-worker-%d").build());
clientId = conf.getInt("mapred.task.partition", -1);
@@ -332,6 +333,12 @@ else[HADOOP_NON_SECURE]*/
this.address = address;
this.taskId = taskId;
}
+
+ @Override
+ public String toString() {
+ return "(future=" + future + ",address=" + address + ",taskId=" +
+ taskId + ")";
+ }
}
/**
@@ -403,8 +410,13 @@ else[HADOOP_NON_SECURE]*/
ChannelRotater rotater =
addressChannelMap.get(waitingConnection.address);
if (rotater == null) {
- rotater = new ChannelRotater(waitingConnection.taskId);
- addressChannelMap.put(waitingConnection.address, rotater);
+ ChannelRotater newRotater =
+ new ChannelRotater(waitingConnection.taskId);
+ rotater = addressChannelMap.putIfAbsent(
+ waitingConnection.address, newRotater);
+ if (rotater == null) {
+ rotater = newRotater;
+ }
}
rotater.addChannel(future.getChannel());
++connected;
@@ -515,30 +527,27 @@ else[HADOOP_NON_SECURE]*/
// in addressChannelMap are closed (success or failure)
int channelCount = 0;
for (ChannelRotater channelRotater : addressChannelMap.values()) {
- channelCount += channelRotater.getChannels().size();
+ channelCount += channelRotater.size();
}
final int done = channelCount;
final AtomicInteger count = new AtomicInteger(0);
for (ChannelRotater channelRotater : addressChannelMap.values()) {
- for (Channel channel : channelRotater.getChannels()) {
- ChannelFuture result = channel.close();
- result.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture cf) {
- context.progress();
- if (count.incrementAndGet() == done) {
- if (LOG.isInfoEnabled()) {
- LOG.info("stop: reached wait threshold, " +
- done + " connections closed, releasing " +
- "NettyClient.bootstrap resources now.");
- }
- bossExecutorService.shutdownNow();
- workerExecutorService.shutdownNow();
- bootstrap.releaseExternalResources();
+ channelRotater.closeChannels(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture cf) {
+ context.progress();
+ if (count.incrementAndGet() == done) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("stop: reached wait threshold, " +
+ done + " connections closed, releasing " +
+ "NettyClient.bootstrap resources now.");
}
+ bossExecutorService.shutdownNow();
+ workerExecutorService.shutdownNow();
+ bootstrap.releaseExternalResources();
}
- });
- }
+ }
+ });
}
}
@@ -561,7 +570,10 @@ else[HADOOP_NON_SECURE]*/
}
// Get rid of the failed channel
- addressChannelMap.get(remoteServer).removeLast();
+ if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
+ LOG.warn("getNextChannel: Unlikely event that the channel " +
+ channel + " was already removed!");
+ }
if (LOG.isInfoEnabled()) {
LOG.info("getNextChannel: Fixing disconnected channel to " +
remoteServer + ", open = " + channel.isOpen() + ", " +
@@ -744,15 +756,4 @@ else[HADOOP_NON_SECURE]*/
addedRequestInfos.clear();
}
}
-
- /**
- * Returning configuration of the first channel.
- * @throws ArrayIndexOutOfBoundsException if no
- * channels exist in the client's address => channel map.
- * @return ChannelConfig for the first channel (if any).
- */
- public ChannelConfig getChannelConfig() {
- return ((Channel) addressChannelMap.values().toArray()[0]).getConfig();
- }
-
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Wed Oct 17 04:33:16 2012
@@ -36,6 +36,7 @@ import org.apache.giraph.comm.netty.hand
else[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.SaslServerHandler;
/*end[HADOOP_NON_SECURE]*/
+import java.util.concurrent.TimeUnit;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.log4j.Logger;
@@ -141,10 +142,10 @@ else[HADOOP_NON_SECURE]*/
bossExecutorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(
- "Giraph Server Netty Boss #%d").build());
+ "netty-server-boss-%d").build());
workerExecutorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(
- "Giraph Server Netty Worker #%d").build());
+ "netty-server-worker-%d").build());
try {
this.localHostname = InetAddress.getLocalHost().getHostName();
@@ -175,7 +176,9 @@ else[HADOOP_NON_SECURE]*/
int executionThreads = conf.getNettyServerExecutionThreads();
executionHandler = new ExecutionHandler(
new MemoryAwareThreadPoolExecutor(
- executionThreads, 1048576, 1048576));
+ executionThreads, 1048576, 1048576, 1, TimeUnit.HOURS,
+ new ThreadFactoryBuilder().setNameFormat("netty-server-exec-%d").
+ build()));
if (LOG.isInfoEnabled()) {
LOG.info("NettyServer: Using execution handler with " +
executionThreads + " threads after " +
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Wed Oct 17 04:33:16 2012
@@ -18,25 +18,13 @@
package org.apache.giraph.comm.netty;
+import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.SendMessageCache;
-import org.apache.giraph.comm.SendMutationsCache;
-import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
-import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
-import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
-import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
-import org.apache.giraph.comm.requests.SendVertexRequest;
-import org.apache.giraph.comm.requests.WorkerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.WorkerInfo;
-import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -48,9 +36,7 @@ import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
-import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -66,11 +52,10 @@ import java.util.concurrent.ConcurrentHa
public class NettyWorkerClient<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> implements
WorkerClient<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(NettyWorkerClient.class);
/** Signal for getInetSocketAddress() to use WorkerInfo's address */
- private static final int NO_PARTITION_ID = Integer.MIN_VALUE;
+ public static final int NO_PARTITION_ID = Integer.MIN_VALUE;
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
/** Hadoop configuration */
private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Netty client that does that actual I/O */
@@ -80,26 +65,11 @@ public class NettyWorkerClient<I extends
/**
* Cached map of partition ids to remote socket address.
*/
- private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
+ private final ConcurrentMap<Integer, InetSocketAddress>
+ partitionIndexAddressMap =
new ConcurrentHashMap<Integer, InetSocketAddress>();
- /**
- * Cached map of partitions to vertex indices to messages
- */
- private final SendMessageCache<I, M> sendMessageCache;
- /**
- * Cached map of partitions to vertex indices to mutations
- */
- private final SendMutationsCache<I, V, E, M> sendMutationsCache;
- /** Maximum number of messages per remote worker to cache before sending */
- private final int maxMessagesPerWorker;
- /** Maximum number of mutations per partition before sending */
- private final int maxMutationsPerPartition;
/** Maximum number of attempts to resolve an address*/
private final int maxResolveAddressAttempts;
- /** Messages sent during the last superstep */
- private long totalMsgsSentInSuperstep = 0;
- /** Server data from the server */
- private final ServerData<I, V, E, M> serverData;
/**
* Only constructor.
@@ -107,37 +77,32 @@ public class NettyWorkerClient<I extends
* @param context Context from mapper
* @param configuration Configuration
* @param service Used to get partition mapping
- * @param serverData Server data (used for local requests)
*/
public NettyWorkerClient(
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- CentralizedServiceWorker<I, V, E, M> service,
- ServerData<I, V, E, M> serverData) {
+ CentralizedServiceWorker<I, V, E, M> service) {
this.nettyClient = new NettyClient(context, configuration);
this.conf = configuration;
this.service = service;
- maxMessagesPerWorker = conf.getInt(
- GiraphConfiguration.MSG_SIZE,
- GiraphConfiguration.MSG_SIZE_DEFAULT);
- maxMutationsPerPartition = conf.getInt(
- GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST,
- GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
+
maxResolveAddressAttempts = conf.getInt(
GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
- sendMessageCache = new SendMessageCache<I, M>(conf);
- sendMutationsCache = new SendMutationsCache<I, V, E, M>();
- this.serverData = serverData;
+ }
+
+ public CentralizedServiceWorker<I, V, E, M> getService() {
+ return service;
}
@Override
- public void fixPartitionIdToSocketAddrMap() {
+ public void openConnections(
+ Collection<? extends PartitionOwner> partitionOwners) {
// 1. Fix all the cached inet addresses (remove all changed entries)
// 2. Connect to any new IPC servers
Map<InetSocketAddress, Integer> addressTaskIdMap =
- Maps.newHashMapWithExpectedSize(service.getPartitionOwners().size());
- for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
+ Maps.newHashMapWithExpectedSize(partitionOwners.size());
+ for (PartitionOwner partitionOwner : partitionOwners) {
InetSocketAddress address =
partitionIndexAddressMap.get(
partitionOwner.getPartitionId());
@@ -165,23 +130,19 @@ public class NettyWorkerClient<I extends
partitionOwner.getWorkerInfo().getTaskId());
}
}
+
addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
null);
nettyClient.connectAllAddresses(addressTaskIdMap);
}
- /**
- * Fill the socket address cache for the worker info and its partition.
- *
- * @param workerInfo Worker information to get the socket address
- * @param partitionId Partition id to look up.
- * @return address of the vertex range server containing this vertex
- */
- private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
+ @Override
+ public InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
int partitionId) {
InetSocketAddress address = partitionIndexAddressMap.get(partitionId);
if (address == null) {
- address = resolveAddress(workerInfo.getInetSocketAddress());
+ address = resolveAddress(maxResolveAddressAttempts,
+ workerInfo.getInetSocketAddress());
if (partitionId != NO_PARTITION_ID) {
// Only cache valid partition ids
partitionIndexAddressMap.put(partitionId, address);
@@ -193,12 +154,16 @@ public class NettyWorkerClient<I extends
/**
* Utility method for getInetSocketAddress()
+ *
+ * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
+ * address
* @param address the address we are attempting to resolve
* @return the successfully resolved address.
* @throws IllegalStateException if the address is not resolved
* in <code>maxResolveAddressAttempts</code> tries.
*/
- private InetSocketAddress resolveAddress(InetSocketAddress address) {
+ private static InetSocketAddress resolveAddress(
+ int maxResolveAddressAttempts, InetSocketAddress address) {
int resolveAttempts = 0;
while (address.isUnresolved() &&
resolveAttempts < maxResolveAddressAttempts) {
@@ -211,6 +176,8 @@ public class NettyWorkerClient<I extends
} catch (InterruptedException e) {
LOG.warn("resolveAddress: Interrupted.", e);
}
+ address = new InetSocketAddress(address.getHostName(),
+ address.getPort());
}
if (resolveAttempts >= maxResolveAddressAttempts) {
throw new IllegalStateException("resolveAddress: Couldn't " +
@@ -219,254 +186,24 @@ public class NettyWorkerClient<I extends
return address;
}
- /**
- * When doing the request, short circuit if it is local
- *
- * @param workerInfo Worker info
- * @param remoteServerAddress Remote server address
- * @param writableRequest Request to either submit or run locally
- */
- private void doRequest(WorkerInfo workerInfo,
- InetSocketAddress remoteServerAddress,
- WritableRequest writableRequest) {
- // If this is local, execute locally
- if (service.getWorkerInfo().getTaskId() ==
- workerInfo.getTaskId()) {
- ((WorkerRequest) writableRequest).doRequest(serverData);
- } else {
- nettyClient.sendWritableRequest(
- workerInfo.getTaskId(), remoteServerAddress, writableRequest);
- }
- }
-
- /**
- * Lookup PartitionOwner for a vertex.
- * @param vertexId id to look up.
- * @return PartitionOwner holding the vertex.
- */
+ @Override
public PartitionOwner getVertexPartitionOwner(I vertexId) {
return service.getVertexPartitionOwner(vertexId);
}
@Override
- public void sendMessageRequest(I destVertexId, M message) {
- PartitionOwner owner = getVertexPartitionOwner(destVertexId);
- WorkerInfo workerInfo = owner.getWorkerInfo();
- final int partitionId = owner.getPartitionId();
- if (LOG.isTraceEnabled()) {
- LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
- ") to " + destVertexId + " on worker " + workerInfo);
- }
- ++totalMsgsSentInSuperstep;
-
- // Add the message to the cache
- int workerMessageCount = sendMessageCache
- .addMessage(workerInfo, partitionId, destVertexId, message);
-
- // Send a request if the cache of outgoing message to
- // the remote worker 'workerInfo' is full enough to be flushed
- if (workerMessageCount >= maxMessagesPerWorker) {
- Map<Integer, Map<I, Collection<M>>> workerMessages =
- sendMessageCache.removeWorkerMessages(workerInfo);
- InetSocketAddress remoteWorkerAddress =
- getInetSocketAddress(workerInfo, partitionId);
- WritableRequest writableRequest =
- new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
- doRequest(workerInfo, remoteWorkerAddress, writableRequest);
- }
- }
-
- @Override
- public void sendPartitionRequest(WorkerInfo workerInfo,
- Partition<I, V, E, M> partition) {
- final int partitionId = partition.getId();
- InetSocketAddress remoteServerAddress =
- getInetSocketAddress(workerInfo, partitionId);
- if (LOG.isTraceEnabled()) {
- LOG.trace("sendPartitionRequest: Sending to " +
- remoteServerAddress +
- " from " + workerInfo + ", with partition " + partition);
- }
-
- WritableRequest vertexRequest =
- new SendVertexRequest<I, V, E, M>(partitionId,
- partition.getVertices());
- doRequest(workerInfo, remoteServerAddress, vertexRequest);
-
- // Messages are stored separately
- MessageStoreByPartition<I, M> messageStore =
- service.getServerData().getCurrentMessageStore();
- Map<I, Collection<M>> map = Maps.newHashMap();
- int messagesInMap = 0;
- for (I vertexId :
- messageStore.getPartitionDestinationVertices(partitionId)) {
- try {
- Collection<M> messages = messageStore.getVertexMessages(vertexId);
- map.put(vertexId, messages);
- messagesInMap += messages.size();
- } catch (IOException e) {
- throw new IllegalStateException(
- "sendPartitionReq: Got IOException ", e);
- }
- if (messagesInMap > maxMessagesPerWorker) {
- WritableRequest messagesRequest = new
- SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
- doRequest(workerInfo, remoteServerAddress, messagesRequest);
- map.clear();
- messagesInMap = 0;
- }
- }
- if (!map.isEmpty()) {
- WritableRequest messagesRequest = new
- SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
- doRequest(workerInfo, remoteServerAddress, messagesRequest);
- }
- }
-
- /**
- * Send a mutations request if the maximum number of mutations per partition
- * was met.
- *
- * @param partitionId Partition id
- * @param partitionOwner Owner of the partition
- * @param partitionMutationCount Number of mutations for this partition
- */
- private void sendMutationsRequestIfFull(
- int partitionId, PartitionOwner partitionOwner,
- int partitionMutationCount) {
- // Send a request if enough mutations are there for a partition
- if (partitionMutationCount >= maxMutationsPerPartition) {
- InetSocketAddress remoteServerAddress =
- getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
- Map<I, VertexMutations<I, V, E, M>> partitionMutations =
- sendMutationsCache.removePartitionMutations(partitionId);
- WritableRequest writableRequest =
- new SendPartitionMutationsRequest<I, V, E, M>(
- partitionId, partitionMutations);
- doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
- writableRequest);
- }
- }
-
- @Override
- public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
- IOException {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
- int partitionId = partitionOwner.getPartitionId();
- if (LOG.isTraceEnabled()) {
- LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
- vertexIndex + " with partition " + partitionId);
- }
-
- // Add the message to the cache
- int partitionMutationCount =
- sendMutationsCache.addEdgeMutation(partitionId, vertexIndex, edge);
-
- sendMutationsRequestIfFull(
- partitionId, partitionOwner, partitionMutationCount);
- }
-
- @Override
- public void removeEdgeRequest(I vertexIndex,
- I destinationVertexIndex) throws IOException {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
- int partitionId = partitionOwner.getPartitionId();
- if (LOG.isTraceEnabled()) {
- LOG.trace("removeEdgeRequest: Removing edge " +
- destinationVertexIndex +
- " for index " + vertexIndex + " with partition " + partitionId);
- }
-
- // Add the message to the cache
- int partitionMutationCount =
- sendMutationsCache.removeEdgeMutation(
- partitionId, vertexIndex, destinationVertexIndex);
-
- sendMutationsRequestIfFull(
- partitionId, partitionOwner, partitionMutationCount);
- }
-
- @Override
- public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertex.getId());
- int partitionId = partitionOwner.getPartitionId();
- if (LOG.isTraceEnabled()) {
- LOG.trace("addVertexRequest: Sending vertex " + vertex +
- " to partition " + partitionId);
- }
-
- // Add the message to the cache
- int partitionMutationCount =
- sendMutationsCache.addVertexMutation(partitionId, vertex);
-
- sendMutationsRequestIfFull(
- partitionId, partitionOwner, partitionMutationCount);
- }
-
- @Override
- public void removeVertexRequest(I vertexIndex) throws IOException {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
- int partitionId = partitionOwner.getPartitionId();
- if (LOG.isTraceEnabled()) {
- LOG.trace("removeVertexRequest: Removing vertex index " +
- vertexIndex + " from partition " + partitionId);
- }
-
- // Add the message to the cache
- int partitionMutationCount =
- sendMutationsCache.removeVertexMutation(partitionId, vertexIndex);
-
- sendMutationsRequestIfFull(
- partitionId, partitionOwner, partitionMutationCount);
+ public void sendWritableRequest(Integer destWorkerId,
+ InetSocketAddress remoteServer,
+ WritableRequest request) {
+ nettyClient.sendWritableRequest(destWorkerId, remoteServer, request);
}
@Override
- public void flush() throws IOException {
- // Execute the remaining sends messages (if any)
- Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> remainingMessageCache =
- sendMessageCache.removeAllMessages();
- for (Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
- remainingMessageCache.entrySet()) {
- Iterator<Integer> cachedPartitionId =
- entry.getValue().keySet().iterator();
- final int partitionId = cachedPartitionId.hasNext() ?
- cachedPartitionId.next() : NO_PARTITION_ID;
- InetSocketAddress remoteWorkerAddress =
- getInetSocketAddress(entry.getKey(), partitionId);
- WritableRequest writableRequest =
- new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
- doRequest(entry.getKey(), remoteWorkerAddress, writableRequest);
- }
-
- // Execute the remaining sends mutations (if any)
- Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
- sendMutationsCache.removeAllPartitionMutations();
- for (Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
- remainingMutationsCache.entrySet()) {
- WritableRequest writableRequest =
- new SendPartitionMutationsRequest<I, V, E, M>(
- entry.getKey(), entry.getValue());
- PartitionOwner partitionOwner =
- getVertexPartitionOwner(
- entry.getValue().keySet().iterator().next());
- InetSocketAddress remoteServerAddress =
- getInetSocketAddress(partitionOwner.getWorkerInfo(),
- partitionOwner.getPartitionId());
- doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
- writableRequest);
- }
-
+ public void waitAllRequests() {
nettyClient.waitAllRequests();
}
@Override
- public long resetMessageCount() {
- long messagesSentInSuperstep = totalMsgsSentInSuperstep;
- totalMsgsSentInSuperstep = 0;
- return messagesSentInSuperstep;
- }
-
- @Override
public void closeConnections() throws IOException {
nettyClient.stop();
}
@@ -474,12 +211,12 @@ public class NettyWorkerClient<I extends
/*if[HADOOP_NON_SECURE]
@Override
public void setup() {
- fixPartitionIdToSocketAddrMap();
+ openConnections(service.getPartitionOwners());
}
else[HADOOP_NON_SECURE]*/
@Override
public void setup(boolean authenticate) {
- fixPartitionIdToSocketAddrMap();
+ openConnections(service.getPartitionOwners());
if (authenticate) {
authenticate();
}
Copied: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (from r1399043, giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?p2=giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java&p1=giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java&r1=1399043&r2=1399090&rev=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Wed Oct 17 04:33:16 2012
@@ -15,21 +15,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.giraph.comm.netty;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.SendMessageCache;
import org.apache.giraph.comm.SendMutationsCache;
+import org.apache.giraph.comm.SendPartitionCache;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
-import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
import org.apache.giraph.comm.requests.SendVertexRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.WorkerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.graph.Edge;
@@ -43,214 +50,76 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
- * Takes users facing APIs in {@link WorkerClient} and implements them
- * using the available {@link WritableRequest} objects.
+ * Aggregate requests and sends them to the thread-safe NettyClient. This
+ * class is not thread-safe and expected to be used and then thrown away after
+ * a phase of communication has completed.
*
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
* @param <M> Message data
*/
-@SuppressWarnings("rawtypes")
-public class NettyWorkerClient<I extends WritableComparable,
+public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> implements
- WorkerClient<I, V, E, M> {
+ WorkerClientRequestProcessor<I, V, E, M> {
/** Class logger */
private static final Logger LOG =
- Logger.getLogger(NettyWorkerClient.class);
- /** Signal for getInetSocketAddress() to use WorkerInfo's address */
- private static final int NO_PARTITION_ID = Integer.MIN_VALUE;
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** Netty client that does that actual I/O */
- private final NettyClient nettyClient;
- /** Centralized service, needed to get vertex ranges */
- private final CentralizedServiceWorker<I, V, E, M> service;
- /**
- * Cached map of partition ids to remote socket address.
- */
- private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
- new ConcurrentHashMap<Integer, InetSocketAddress>();
- /**
- * Cached map of partitions to vertex indices to messages
- */
+ Logger.getLogger(NettyWorkerClientRequestProcessor.class);
+ /** Cached partitions of vertices to send */
+ private final SendPartitionCache<I, V, E, M> sendPartitionCache;
+ /** Cached map of partitions to vertex indices to messages */
private final SendMessageCache<I, M> sendMessageCache;
- /**
- * Cached map of partitions to vertex indices to mutations
- */
- private final SendMutationsCache<I, V, E, M> sendMutationsCache;
+ /** Cached map of partitions to vertex indices to mutations */
+ private final SendMutationsCache<I, V, E, M> sendMutationsCache =
+ new SendMutationsCache<I, V, E, M>();
+ /** NettyClient that could be shared among one or more instances */
+ private final WorkerClient<I, V, E, M> workerClient;
+ /** Messages sent during the last superstep */
+ private long totalMsgsSentInSuperstep = 0;
/** Maximum number of messages per remote worker to cache before sending */
private final int maxMessagesPerWorker;
/** Maximum number of mutations per partition before sending */
private final int maxMutationsPerPartition;
- /** Maximum number of attempts to resolve an address*/
- private final int maxResolveAddressAttempts;
- /** Messages sent during the last superstep */
- private long totalMsgsSentInSuperstep = 0;
- /** Server data from the server */
+ /** Giraph configuration */
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ /** Service worker */
+ private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
+ /** Server data from the server (used for local requests) */
private final ServerData<I, V, E, M> serverData;
/**
- * Only constructor.
+ * Constructor.
*
- * @param context Context from mapper
+ * @param context Context
* @param configuration Configuration
- * @param service Used to get partition mapping
- * @param serverData Server data (used for local requests)
+ * @param serviceWorker Service worker
*/
- public NettyWorkerClient(
+ public NettyWorkerClientRequestProcessor(
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- CentralizedServiceWorker<I, V, E, M> service,
- ServerData<I, V, E, M> serverData) {
- this.nettyClient = new NettyClient(context, configuration);
- this.conf = configuration;
- this.service = service;
- maxMessagesPerWorker = conf.getInt(
+ CentralizedServiceWorker<I, V, E, M> serviceWorker) {
+ this.workerClient = serviceWorker.getWorkerClient();
+ this.configuration = configuration;
+
+ sendPartitionCache = new SendPartitionCache<I, V, E, M>(context,
+ configuration);
+ sendMessageCache =
+ new SendMessageCache<I, M>(configuration);
+ maxMessagesPerWorker = configuration.getInt(
GiraphConfiguration.MSG_SIZE,
GiraphConfiguration.MSG_SIZE_DEFAULT);
- maxMutationsPerPartition = conf.getInt(
+ maxMutationsPerPartition = configuration.getInt(
GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST,
GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
- maxResolveAddressAttempts = conf.getInt(
- GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
- GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
- sendMessageCache = new SendMessageCache<I, M>(conf);
- sendMutationsCache = new SendMutationsCache<I, V, E, M>();
- this.serverData = serverData;
- }
-
- @Override
- public void fixPartitionIdToSocketAddrMap() {
- // 1. Fix all the cached inet addresses (remove all changed entries)
- // 2. Connect to any new IPC servers
- Map<InetSocketAddress, Integer> addressTaskIdMap =
- Maps.newHashMapWithExpectedSize(service.getPartitionOwners().size());
- for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
- InetSocketAddress address =
- partitionIndexAddressMap.get(
- partitionOwner.getPartitionId());
- if (address != null &&
- (!address.getHostName().equals(
- partitionOwner.getWorkerInfo().getHostname()) ||
- address.getPort() !=
- partitionOwner.getWorkerInfo().getPort())) {
- if (LOG.isInfoEnabled()) {
- LOG.info("fixPartitionIdToSocketAddrMap: " +
- "Partition owner " +
- partitionOwner + " changed from " +
- address);
- }
- partitionIndexAddressMap.remove(
- partitionOwner.getPartitionId());
- }
-
- // No need to connect to myself
- if (service.getWorkerInfo().getTaskId() !=
- partitionOwner.getWorkerInfo().getTaskId()) {
- addressTaskIdMap.put(
- getInetSocketAddress(partitionOwner.getWorkerInfo(),
- partitionOwner.getPartitionId()),
- partitionOwner.getWorkerInfo().getTaskId());
- }
- }
- addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
- null);
- nettyClient.connectAllAddresses(addressTaskIdMap);
- }
-
- /**
- * Fill the socket address cache for the worker info and its partition.
- *
- * @param workerInfo Worker information to get the socket address
- * @param partitionId Partition id to look up.
- * @return address of the vertex range server containing this vertex
- */
- private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
- int partitionId) {
- InetSocketAddress address = partitionIndexAddressMap.get(partitionId);
- if (address == null) {
- address = resolveAddress(workerInfo.getInetSocketAddress());
- if (partitionId != NO_PARTITION_ID) {
- // Only cache valid partition ids
- partitionIndexAddressMap.put(partitionId, address);
- }
- }
-
- return address;
- }
-
- /**
- * Utility method for getInetSocketAddress()
- * @param address the address we are attempting to resolve
- * @return the successfully resolved address.
- * @throws IllegalStateException if the address is not resolved
- * in <code>maxResolveAddressAttempts</code> tries.
- */
- private InetSocketAddress resolveAddress(InetSocketAddress address) {
- int resolveAttempts = 0;
- while (address.isUnresolved() &&
- resolveAttempts < maxResolveAddressAttempts) {
- ++resolveAttempts;
- LOG.warn("resolveAddress: Failed to resolve " + address +
- " on attempt " + resolveAttempts + " of " +
- maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- LOG.warn("resolveAddress: Interrupted.", e);
- }
- }
- if (resolveAttempts >= maxResolveAddressAttempts) {
- throw new IllegalStateException("resolveAddress: Couldn't " +
- "resolve " + address + " in " + resolveAttempts + " tries.");
- }
- return address;
- }
-
- /**
- * When doing the request, short circuit if it is local
- *
- * @param workerInfo Worker info
- * @param remoteServerAddress Remote server address
- * @param writableRequest Request to either submit or run locally
- */
- private void doRequest(WorkerInfo workerInfo,
- InetSocketAddress remoteServerAddress,
- WritableRequest writableRequest) {
- // If this is local, execute locally
- if (service.getWorkerInfo().getTaskId() ==
- workerInfo.getTaskId()) {
- ((WorkerRequest) writableRequest).doRequest(serverData);
- } else {
- nettyClient.sendWritableRequest(
- workerInfo.getTaskId(), remoteServerAddress, writableRequest);
- }
- }
-
- /**
- * Lookup PartitionOwner for a vertex.
- * @param vertexId id to look up.
- * @return PartitionOwner holding the vertex.
- */
- public PartitionOwner getVertexPartitionOwner(I vertexId) {
- return service.getVertexPartitionOwner(vertexId);
+ this.serviceWorker = serviceWorker;
+ this.serverData = serviceWorker.getServerData();
}
@Override
public void sendMessageRequest(I destVertexId, M message) {
- PartitionOwner owner = getVertexPartitionOwner(destVertexId);
+ PartitionOwner owner =
+ serviceWorker.getVertexPartitionOwner(destVertexId);
WorkerInfo workerInfo = owner.getWorkerInfo();
final int partitionId = owner.getPartitionId();
if (LOG.isTraceEnabled()) {
@@ -260,18 +129,18 @@ public class NettyWorkerClient<I extends
++totalMsgsSentInSuperstep;
// Add the message to the cache
- int workerMessageCount = sendMessageCache
- .addMessage(workerInfo, partitionId, destVertexId, message);
+ int workerMessageCount = sendMessageCache.addMessage(
+ workerInfo, partitionId, destVertexId, message);
// Send a request if the cache of outgoing message to
// the remote worker 'workerInfo' is full enough to be flushed
if (workerMessageCount >= maxMessagesPerWorker) {
Map<Integer, Map<I, Collection<M>>> workerMessages =
- sendMessageCache.removeWorkerMessages(workerInfo);
+ sendMessageCache.removeWorkerMessages(workerInfo);
InetSocketAddress remoteWorkerAddress =
- getInetSocketAddress(workerInfo, partitionId);
+ workerClient.getInetSocketAddress(workerInfo, partitionId);
WritableRequest writableRequest =
- new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
+ new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
doRequest(workerInfo, remoteWorkerAddress, writableRequest);
}
}
@@ -281,9 +150,9 @@ public class NettyWorkerClient<I extends
Partition<I, V, E, M> partition) {
final int partitionId = partition.getId();
InetSocketAddress remoteServerAddress =
- getInetSocketAddress(workerInfo, partitionId);
+ workerClient.getInetSocketAddress(workerInfo, partitionId);
if (LOG.isTraceEnabled()) {
- LOG.trace("sendPartitionRequest: Sending to " +
+ LOG.trace("sendVertexRequest: Sending to " +
remoteServerAddress +
" from " + workerInfo + ", with partition " + partition);
}
@@ -295,7 +164,7 @@ public class NettyWorkerClient<I extends
// Messages are stored separately
MessageStoreByPartition<I, M> messageStore =
- service.getServerData().getCurrentMessageStore();
+ serverData.getCurrentMessageStore();
Map<I, Collection<M>> map = Maps.newHashMap();
int messagesInMap = 0;
for (I vertexId :
@@ -306,7 +175,7 @@ public class NettyWorkerClient<I extends
messagesInMap += messages.size();
} catch (IOException e) {
throw new IllegalStateException(
- "sendPartitionReq: Got IOException ", e);
+ "sendVertexRequest: Got IOException ", e);
}
if (messagesInMap > maxMessagesPerWorker) {
WritableRequest messagesRequest = new
@@ -323,35 +192,23 @@ public class NettyWorkerClient<I extends
}
}
- /**
- * Send a mutations request if the maximum number of mutations per partition
- * was met.
- *
- * @param partitionId Partition id
- * @param partitionOwner Owner of the partition
- * @param partitionMutationCount Number of mutations for this partition
- */
- private void sendMutationsRequestIfFull(
- int partitionId, PartitionOwner partitionOwner,
- int partitionMutationCount) {
- // Send a request if enough mutations are there for a partition
- if (partitionMutationCount >= maxMutationsPerPartition) {
- InetSocketAddress remoteServerAddress =
- getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
- Map<I, VertexMutations<I, V, E, M>> partitionMutations =
- sendMutationsCache.removePartitionMutations(partitionId);
- WritableRequest writableRequest =
- new SendPartitionMutationsRequest<I, V, E, M>(
- partitionId, partitionMutations);
- doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
- writableRequest);
+ @Override
+ public void sendVertexRequest(PartitionOwner partitionOwner,
+ Vertex<I, V, E, M> vertex) {
+ Partition<I, V, E, M> partition =
+ sendPartitionCache.addVertex(partitionOwner, vertex);
+ if (partition == null) {
+ return;
}
+
+ sendPartitionRequest(partitionOwner.getWorkerInfo(), partition);
}
@Override
public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
IOException {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+ PartitionOwner partitionOwner =
+ serviceWorker.getVertexPartitionOwner(vertexIndex);
int partitionId = partitionOwner.getPartitionId();
if (LOG.isTraceEnabled()) {
LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
@@ -366,10 +223,37 @@ public class NettyWorkerClient<I extends
partitionId, partitionOwner, partitionMutationCount);
}
+ /**
+ * Send a mutations request if the maximum number of mutations per partition
+ * was met.
+ *
+ * @param partitionId Partition id
+ * @param partitionOwner Owner of the partition
+ * @param partitionMutationCount Number of mutations for this partition
+ */
+ private void sendMutationsRequestIfFull(
+ int partitionId, PartitionOwner partitionOwner,
+ int partitionMutationCount) {
+ // Send a request if enough mutations are there for a partition
+ if (partitionMutationCount >= maxMutationsPerPartition) {
+ InetSocketAddress remoteServerAddress =
+ workerClient.getInetSocketAddress(
+ partitionOwner.getWorkerInfo(), partitionId);
+ Map<I, VertexMutations<I, V, E, M>> partitionMutations =
+ sendMutationsCache.removePartitionMutations(partitionId);
+ WritableRequest writableRequest =
+ new SendPartitionMutationsRequest<I, V, E, M>(
+ partitionId, partitionMutations);
+ doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+ writableRequest);
+ }
+ }
+
@Override
public void removeEdgeRequest(I vertexIndex,
I destinationVertexIndex) throws IOException {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+ PartitionOwner partitionOwner =
+ serviceWorker.getVertexPartitionOwner(vertexIndex);
int partitionId = partitionOwner.getPartitionId();
if (LOG.isTraceEnabled()) {
LOG.trace("removeEdgeRequest: Removing edge " +
@@ -388,7 +272,8 @@ public class NettyWorkerClient<I extends
@Override
public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertex.getId());
+ PartitionOwner partitionOwner =
+ serviceWorker.getVertexPartitionOwner(vertex.getId());
int partitionId = partitionOwner.getPartitionId();
if (LOG.isTraceEnabled()) {
LOG.trace("addVertexRequest: Sending vertex " + vertex +
@@ -405,7 +290,8 @@ public class NettyWorkerClient<I extends
@Override
public void removeVertexRequest(I vertexIndex) throws IOException {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+ PartitionOwner partitionOwner =
+ serviceWorker.getVertexPartitionOwner(vertexIndex);
int partitionId = partitionOwner.getPartitionId();
if (LOG.isTraceEnabled()) {
LOG.trace("removeVertexRequest: Removing vertex index " +
@@ -422,41 +308,46 @@ public class NettyWorkerClient<I extends
@Override
public void flush() throws IOException {
+ // Execute the remaining send partitions (if any)
+ for (Map.Entry<PartitionOwner, Partition<I, V, E, M>> entry :
+ sendPartitionCache.getOwnerPartitionMap().entrySet()) {
+ sendPartitionRequest(entry.getKey().getWorkerInfo(), entry.getValue());
+ }
+ sendPartitionCache.clear();
+
// Execute the remaining sends messages (if any)
- Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> remainingMessageCache =
- sendMessageCache.removeAllMessages();
- for (Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
- remainingMessageCache.entrySet()) {
+ Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>
+ remainingMessageCache = sendMessageCache.removeAllMessages();
+ for (Map.Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
+ remainingMessageCache.entrySet()) {
Iterator<Integer> cachedPartitionId =
- entry.getValue().keySet().iterator();
+ entry.getValue().keySet().iterator();
final int partitionId = cachedPartitionId.hasNext() ?
- cachedPartitionId.next() : NO_PARTITION_ID;
+ cachedPartitionId.next() : NettyWorkerClient.NO_PARTITION_ID;
InetSocketAddress remoteWorkerAddress =
- getInetSocketAddress(entry.getKey(), partitionId);
+ workerClient.getInetSocketAddress(entry.getKey(), partitionId);
WritableRequest writableRequest =
- new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
+ new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
doRequest(entry.getKey(), remoteWorkerAddress, writableRequest);
}
// Execute the remaining sends mutations (if any)
Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
sendMutationsCache.removeAllPartitionMutations();
- for (Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
+ for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
remainingMutationsCache.entrySet()) {
WritableRequest writableRequest =
new SendPartitionMutationsRequest<I, V, E, M>(
entry.getKey(), entry.getValue());
PartitionOwner partitionOwner =
- getVertexPartitionOwner(
+ serviceWorker.getVertexPartitionOwner(
entry.getValue().keySet().iterator().next());
InetSocketAddress remoteServerAddress =
- getInetSocketAddress(partitionOwner.getWorkerInfo(),
+ workerClient.getInetSocketAddress(partitionOwner.getWorkerInfo(),
partitionOwner.getPartitionId());
doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
writableRequest);
}
-
- nettyClient.waitAllRequests();
}
@Override
@@ -467,30 +358,27 @@ public class NettyWorkerClient<I extends
}
@Override
- public void closeConnections() throws IOException {
- nettyClient.stop();
+ public PartitionOwner getVertexPartitionOwner(I vertexId) {
+ return workerClient.getVertexPartitionOwner(vertexId);
}
-/*if[HADOOP_NON_SECURE]
- @Override
- public void setup() {
- fixPartitionIdToSocketAddrMap();
- }
-else[HADOOP_NON_SECURE]*/
- @Override
- public void setup(boolean authenticate) {
- fixPartitionIdToSocketAddrMap();
- if (authenticate) {
- authenticate();
+ /**
+ * When doing the request, short circuit if it is local
+ *
+ * @param workerInfo Worker info
+ * @param remoteServerAddress Remote server address
+ * @param writableRequest Request to either submit or run locally
+ */
+ private void doRequest(WorkerInfo workerInfo,
+ InetSocketAddress remoteServerAddress,
+ WritableRequest writableRequest) {
+ // If this is local, execute locally
+ if (serviceWorker.getWorkerInfo().getTaskId() ==
+ workerInfo.getTaskId()) {
+ ((WorkerRequest) writableRequest).doRequest(serverData);
+ } else {
+ workerClient.sendWritableRequest(
+ workerInfo.getTaskId(), remoteServerAddress, writableRequest);
}
}
-/*end[HADOOP_NON_SECURE]*/
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
- @Override
- public void authenticate() {
- nettyClient.authenticate();
- }
-/*end[HADOOP_NON_SECURE]*/
}