You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/01/28 04:37:16 UTC
svn commit: r1236971 - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/
src/test/java/org/apache/giraph/comm/
Author: aching
Date: Sat Jan 28 03:37:15 2012
New Revision: 1236971
URL: http://svn.apache.org/viewvc?rev=1236971&view=rev
Log:
GIRAPH-128: RPC port from BasicRPCCommunications should be only a
starting port, and retried. (aching)
Added:
incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/
incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1236971&r1=1236970&r2=1236971&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Sat Jan 28 03:37:15 2012
@@ -2,8 +2,11 @@ Giraph Change Log
Release 0.1.0 - unreleased
+ GIRAPH-128: RPC port from BasicRPCCommunications should be only a
+ starting port, and retried. (aching)
+
GIRAPH-131: Enable creation of test-jars to simplify testing in
- downstream projects. (André Kelpe via jghoman)
+ downstream projects. (André Kelpe via jghoman)
GIRAPH-129: Enable creation of javadoc and sources jars.
(André Kelpe via jghoman)
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1236971&r1=1236970&r2=1236971&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Jan 28 03:37:15 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.log4j.Logger;
import java.io.IOException;
+import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -83,7 +84,7 @@ public abstract class BasicRPCCommunicat
/** Name of RPC server, == myAddress.toString() */
private final String myName;
/** RPC server */
- private final Server server;
+ private Server server;
/** Centralized service, needed to get vertex ranges */
private final CentralizedServiceWorker<I, V, E, M> service;
/** Hadoop configuration */
@@ -91,7 +92,7 @@ public abstract class BasicRPCCommunicat
/** Combiner instance, can be null */
private final VertexCombiner<I, M> combiner;
/** Address of RPC server */
- private final InetSocketAddress myAddress;
+ private InetSocketAddress myAddress;
/** Messages sent during the last superstep */
private long totalMsgsSentInSuperstep = 0;
/** Maximum messages sent per putVertexIdMessagesList RPC */
@@ -153,9 +154,9 @@ public abstract class BasicRPCCommunicat
private final int maxSize;
/** Cached job id */
private final String jobId;
- /** cached job token */
+ /** Cached job token */
private final J jobToken;
- /** maximum number of vertices sent in a single RPC */
+ /** Maximum number of vertices sent in a single RPC */
private static final int MAX_VERTICES_PER_RPC = 1024;
/**
@@ -243,11 +244,11 @@ public abstract class BasicRPCCommunicat
throw new IllegalStateException(
"run: Combiner cannot return null");
}
- if (Iterables.size(entry.getValue()) <
+ if (Iterables.size(entry.getValue()) <
Iterables.size(messages)) {
throw new IllegalStateException(
"run: The number of combined " +
- "messages is required to be <= to " +
+ "messages is required to be <= to " +
"number of messages to be combined");
}
entry.getValue().clear();
@@ -368,7 +369,7 @@ public abstract class BasicRPCCommunicat
throw new IllegalStateException(
"run: Combiner cannot return null");
}
- if (Iterables.size(outMessageList) <
+ if (Iterables.size(outMessageList) <
Iterables.size(messages)) {
throw new IllegalStateException(
"run: The number of combined messages is " +
@@ -433,12 +434,8 @@ public abstract class BasicRPCCommunicat
int taskId = conf.getInt("mapred.task.partition", -1);
int numTasks = conf.getInt("mapred.map.tasks", 1);
- String bindAddress = localHostname;
- int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
- GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
- taskId;
- this.myAddress = new InetSocketAddress(bindAddress, bindPort);
+
int numHandlers = conf.getInt(GiraphJob.RPC_NUM_HANDLERS,
GiraphJob.RPC_NUM_HANDLERS_DEFAULT);
if (numTasks < numHandlers) {
@@ -446,11 +443,6 @@ public abstract class BasicRPCCommunicat
}
this.jobToken = createJobToken();
this.jobId = context.getJobID().toString();
- this.server =
- getRPCServer(myAddress, numHandlers, this.jobId, this.jobToken);
- this.server.start();
-
- this.myName = myAddress.toString();
int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
// If the number of flush threads is unset, it is set to
@@ -461,14 +453,60 @@ public abstract class BasicRPCCommunicat
1);
this.executor = Executors.newFixedThreadPool(numFlushThreads);
+ // Simple handling of port collisions on the same machine while
+ // preserving debugability from the port number alone.
+ // Round up the max number of workers to the next power of 10 and use
+ // it as a constant to increase the port number with.
+ int portIncrementConstant =
+ (int) Math.pow(10, Math.ceil(Math.log10(numWorkers)));
+ String bindAddress = localHostname;
+ int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
+ GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+ taskId;
+ int bindAttempts = 0;
+ final int maxRpcPortBindAttempts =
+ conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
+ GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+ while (bindAttempts < maxRpcPortBindAttempts) {
+ this.myAddress = new InetSocketAddress(bindAddress, bindPort);
+ try {
+ this.server =
+ getRPCServer(
+ myAddress, numHandlers, this.jobId, this.jobToken);
+ break;
+ } catch (BindException e) {
+ LOG.info("BasicRPCCommunications: Failed to bind with port " +
+ bindPort + " on bind attempt " + bindAttempts);
+ ++bindAttempts;
+ bindPort += portIncrementConstant;
+ }
+ }
+ if (bindAttempts == maxRpcPortBindAttempts) {
+ throw new IllegalStateException(
+ "BasicRPCCommunications: Failed to start RPCServer with " +
+ maxRpcPortBindAttempts + " attempts");
+ }
+
+ this.server.start();
+ this.myName = myAddress.toString();
+
if (LOG.isInfoEnabled()) {
LOG.info("BasicRPCCommunications: Started RPC " +
"communication server: " + myName + " with " +
numHandlers + " handlers and " + numFlushThreads +
- " flush threads");
+ " flush threads on bind attempt " + bindAttempts);
}
}
+ /**
+ * Get the final port of the RPC server that it bound to.
+ *
+ * @return Port that RPC server was bound to.
+ */
+ public int getPort() {
+ return myAddress.getPort();
+ }
+
@Override
public void setup() {
try {
@@ -997,15 +1035,15 @@ end[HADOOP_FACEBOOK]*/
for (Entry<I, List<M>> entry : transientInMessages.entrySet()) {
if (combiner != null) {
try {
- Iterable<M> messages =
- combiner.combine(entry.getKey(),
+ Iterable<M> messages =
+ combiner.combine(entry.getKey(),
entry.getValue());
if (messages == null) {
throw new IllegalStateException(
"prepareSuperstep: Combiner cannot " +
"return null");
}
- if (Iterables.size(entry.getValue()) <
+ if (Iterables.size(entry.getValue()) <
Iterables.size(messages)) {
throw new IllegalStateException(
"prepareSuperstep: The number of " +
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1236971&r1=1236970&r2=1236971&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Sat Jan 28 03:37:15 2012
@@ -151,6 +151,12 @@ public class GiraphJob extends Job {
/** Default port to start using for the RPC communication */
public static final int RPC_INITIAL_PORT_DEFAULT = 30000;
+ /** Maximum bind attempts for different RPC ports */
+ public static final String MAX_RPC_PORT_BIND_ATTEMPTS =
+ "giraph.maxRpcPortBindAttempts";
+ /** Default maximum bind attempts for different RPC ports */
+ public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+
/** Maximum number of RPC handlers */
public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";
/** Default maximum number of RPC handlers */
Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java?rev=1236971&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java Sat Jan 28 03:37:15 2012
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import junit.framework.TestCase;
+
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+public class RPCCommunicationsTest extends TestCase {
+
+ public void testDuplicateRpcPort() throws Exception {
+ @SuppressWarnings("rawtypes")
+ Context context = mock(Context.class);
+ Configuration conf = new Configuration();
+ conf.setInt("mapred.task.partition", 9);
+ conf.setInt(GiraphJob.MAX_WORKERS, 13);
+ when(context.getConfiguration()).thenReturn(conf);
+ when(context.getJobID()).thenReturn(new JobID());
+
+ RPCCommunications<IntWritable, IntWritable, IntWritable, IntWritable>
+ comm1 =
+ new RPCCommunications<
+ IntWritable, IntWritable,
+ IntWritable, IntWritable>(context, null, null);
+ RPCCommunications<IntWritable, IntWritable, IntWritable, IntWritable>
+ comm2 =
+ new RPCCommunications<
+ IntWritable, IntWritable,
+ IntWritable, IntWritable>(context, null, null);
+ RPCCommunications<IntWritable, IntWritable, IntWritable, IntWritable>
+ comm3 =
+ new RPCCommunications<
+ IntWritable, IntWritable,
+ IntWritable, IntWritable>(context, null, null);
+ assertEquals(comm1.getPort(), 30009);
+ assertEquals(comm2.getPort(), 30109);
+ assertEquals(comm3.getPort(), 30209);
+ }
+}