You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/01/28 04:37:16 UTC

svn commit: r1236971 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/comm/

Author: aching
Date: Sat Jan 28 03:37:15 2012
New Revision: 1236971

URL: http://svn.apache.org/viewvc?rev=1236971&view=rev
Log:
GIRAPH-128: RPC port from BasicRPCCommunications should be only a
starting port, and retried. (aching)

Added:
    incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/
    incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1236971&r1=1236970&r2=1236971&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Sat Jan 28 03:37:15 2012
@@ -2,8 +2,11 @@ Giraph Change Log
 
 Release 0.1.0 - unreleased
 
+  GIRAPH-128: RPC port from BasicRPCCommunications should be only a
+  starting port, and retried. (aching)
+
   GIRAPH-131: Enable creation of test-jars to simplify testing in 
-  downstream projects.  (André Kelpe via jghoman)
+  downstream projects. (André Kelpe via jghoman)
   
   GIRAPH-129: Enable creation of javadoc and sources jars.
   (André Kelpe via jghoman)

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1236971&r1=1236970&r2=1236971&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Jan 28 03:37:15 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -83,7 +84,7 @@ public abstract class BasicRPCCommunicat
     /** Name of RPC server, == myAddress.toString() */
     private final String myName;
     /** RPC server */
-    private final Server server;
+    private Server server;
     /** Centralized service, needed to get vertex ranges */
     private final CentralizedServiceWorker<I, V, E, M> service;
     /** Hadoop configuration */
@@ -91,7 +92,7 @@ public abstract class BasicRPCCommunicat
     /** Combiner instance, can be null */
     private final VertexCombiner<I, M> combiner;
     /** Address of RPC server */
-    private final InetSocketAddress myAddress;
+    private InetSocketAddress myAddress;
     /** Messages sent during the last superstep */
     private long totalMsgsSentInSuperstep = 0;
     /** Maximum messages sent per putVertexIdMessagesList RPC */
@@ -153,9 +154,9 @@ public abstract class BasicRPCCommunicat
     private final int maxSize;
     /** Cached job id */
     private final String jobId;
-    /** cached job token */
+    /** Cached job token */
     private final J jobToken;
-    /** maximum number of vertices sent in a single RPC */
+    /** Maximum number of vertices sent in a single RPC */
     private static final int MAX_VERTICES_PER_RPC = 1024;
 
     /**
@@ -243,11 +244,11 @@ public abstract class BasicRPCCommunicat
                                 throw new IllegalStateException(
                                         "run: Combiner cannot return null");
                             }
-                            if (Iterables.size(entry.getValue()) < 
+                            if (Iterables.size(entry.getValue()) <
                                     Iterables.size(messages)) {
                                 throw new IllegalStateException(
                                         "run: The number of combined " +
-                                        "messages is required to be <= to " + 
+                                        "messages is required to be <= to " +
                                         "number of messages to be combined");
                             }
                             entry.getValue().clear();
@@ -368,7 +369,7 @@ public abstract class BasicRPCCommunicat
                         throw new IllegalStateException(
                                 "run: Combiner cannot return null");
                     }
-                    if (Iterables.size(outMessageList) < 
+                    if (Iterables.size(outMessageList) <
                             Iterables.size(messages)) {
                         throw new IllegalStateException(
                                 "run: The number of combined messages is " +
@@ -433,12 +434,8 @@ public abstract class BasicRPCCommunicat
         int taskId = conf.getInt("mapred.task.partition", -1);
         int numTasks = conf.getInt("mapred.map.tasks", 1);
 
-        String bindAddress = localHostname;
-        int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
-                                   GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
-                                   taskId;
 
-        this.myAddress = new InetSocketAddress(bindAddress, bindPort);
+
         int numHandlers = conf.getInt(GiraphJob.RPC_NUM_HANDLERS,
                                       GiraphJob.RPC_NUM_HANDLERS_DEFAULT);
         if (numTasks < numHandlers) {
@@ -446,11 +443,6 @@ public abstract class BasicRPCCommunicat
         }
         this.jobToken = createJobToken();
         this.jobId = context.getJobID().toString();
-        this.server =
-            getRPCServer(myAddress, numHandlers, this.jobId, this.jobToken);
-        this.server.start();
-
-        this.myName = myAddress.toString();
 
         int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
         // If the number of flush threads is unset, it is set to
@@ -461,14 +453,60 @@ public abstract class BasicRPCCommunicat
                       1);
         this.executor = Executors.newFixedThreadPool(numFlushThreads);
 
+        // Simple handling of port collisions on the same machine while
+        // preserving debugability from the port number alone.
+        // Round up the max number of workers to the next power of 10 and use
+        // it as a constant to increase the port number with.
+        int portIncrementConstant =
+            (int) Math.pow(10, Math.ceil(Math.log10(numWorkers)));
+        String bindAddress = localHostname;
+        int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
+                                   GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+                                   taskId;
+        int bindAttempts = 0;
+        final int maxRpcPortBindAttempts =
+            conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
+                        GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+        while (bindAttempts < maxRpcPortBindAttempts) {
+            this.myAddress = new InetSocketAddress(bindAddress, bindPort);
+            try {
+                this.server =
+                    getRPCServer(
+                        myAddress, numHandlers, this.jobId, this.jobToken);
+                break;
+            } catch (BindException e) {
+                LOG.info("BasicRPCCommunications: Failed to bind with port " +
+                         bindPort + " on bind attempt " + bindAttempts);
+                ++bindAttempts;
+                bindPort += portIncrementConstant;
+            }
+        }
+        if (bindAttempts == maxRpcPortBindAttempts) {
+            throw new IllegalStateException(
+                "BasicRPCCommunications: Failed to start RPCServer with " +
+                maxRpcPortBindAttempts + " attempts");
+        }
+
+        this.server.start();
+        this.myName = myAddress.toString();
+
         if (LOG.isInfoEnabled()) {
             LOG.info("BasicRPCCommunications: Started RPC " +
                      "communication server: " + myName + " with " +
                      numHandlers + " handlers and " + numFlushThreads +
-                     " flush threads");
+                     " flush threads on bind attempt " + bindAttempts);
         }
     }
 
+    /**
+     * Get the final port of the RPC server that it bound to.
+     *
+     * @return Port that RPC server was bound to.
+     */
+    public int getPort() {
+        return myAddress.getPort();
+    }
+
     @Override
     public void setup() {
         try {
@@ -997,15 +1035,15 @@ end[HADOOP_FACEBOOK]*/
             for (Entry<I, List<M>> entry : transientInMessages.entrySet()) {
                 if (combiner != null) {
                     try {
-                        Iterable<M> messages = 
-                            combiner.combine(entry.getKey(), 
+                        Iterable<M> messages =
+                            combiner.combine(entry.getKey(),
                                              entry.getValue());
                         if (messages == null) {
                             throw new IllegalStateException(
                                     "prepareSuperstep: Combiner cannot " +
                                     "return null");
                         }
-                        if (Iterables.size(entry.getValue()) < 
+                        if (Iterables.size(entry.getValue()) <
                                 Iterables.size(messages)) {
                             throw new IllegalStateException(
                                     "prepareSuperstep: The number of " +

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1236971&r1=1236970&r2=1236971&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Sat Jan 28 03:37:15 2012
@@ -151,6 +151,12 @@ public class GiraphJob extends Job {
     /** Default port to start using for the RPC communication */
     public static final int RPC_INITIAL_PORT_DEFAULT = 30000;
 
+    /** Maximum bind attempts for different RPC ports */
+    public static final String MAX_RPC_PORT_BIND_ATTEMPTS =
+        "giraph.maxRpcPortBindAttempts";
+    /** Default maximum bind attempts for different RPC ports */
+    public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+
     /** Maximum number of RPC handlers */
     public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";
     /** Default maximum number of RPC handlers */

Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java?rev=1236971&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java Sat Jan 28 03:37:15 2012
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import junit.framework.TestCase;
+
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+public class RPCCommunicationsTest extends TestCase {
+
+    public void testDuplicateRpcPort() throws Exception {
+        @SuppressWarnings("rawtypes")
+        Context context = mock(Context.class);
+        Configuration conf = new Configuration();
+        conf.setInt("mapred.task.partition", 9);
+        conf.setInt(GiraphJob.MAX_WORKERS, 13);
+        when(context.getConfiguration()).thenReturn(conf);
+        when(context.getJobID()).thenReturn(new JobID());
+
+        RPCCommunications<IntWritable, IntWritable, IntWritable, IntWritable>
+            comm1 =
+                new RPCCommunications<
+                    IntWritable, IntWritable,
+                    IntWritable, IntWritable>(context, null, null);
+        RPCCommunications<IntWritable, IntWritable, IntWritable, IntWritable>
+            comm2 =
+                new RPCCommunications<
+                    IntWritable, IntWritable,
+                    IntWritable, IntWritable>(context, null, null);
+        RPCCommunications<IntWritable, IntWritable, IntWritable, IntWritable>
+            comm3 =
+                new RPCCommunications<
+                    IntWritable, IntWritable,
+                    IntWritable, IntWritable>(context, null, null);
+        assertEquals(comm1.getPort(), 30009);
+        assertEquals(comm2.getPort(), 30109);
+        assertEquals(comm3.getPort(), 30209);
+    }
+}