You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/03/17 17:57:57 UTC

svn commit: r1301962 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/examples/

Author: aching
Date: Sat Mar 17 16:57:56 2012
New Revision: 1301962

URL: http://svn.apache.org/viewvc?rev=1301962&view=rev
Log:
GIRAPH-154: Worker ports are not synched properly with its peers
(Zhiwei Gu via aching).

Added:
    incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1301962&r1=1301961&r2=1301962&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Sat Mar 17 16:57:56 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-154: Worker ports are not synched properly with its peers
+  (Zhiwei Gu via aching).
+
   GIRAPH-87: Simplify boolean expression in
   BspService::checkpointFrequencyMet (Eli Reisman via aching).
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1301962&r1=1301961&r2=1301962&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Mar 17 16:57:56 2012
@@ -494,8 +494,20 @@ public abstract class BasicRPCCommunicat
     final int maxRpcPortBindAttempts =
         conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
             GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+    final boolean failFirstPortBindingAttempt =
+        conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
+            GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
     while (bindAttempts < maxRpcPortBindAttempts) {
       this.myAddress = new InetSocketAddress(bindAddress, bindPort);
+      if (failFirstPortBindingAttempt && bindAttempts == 0) {
+        LOG.info("BasicRPCCommunications: Intentionally fail first " +
+            "binding attempt as giraph.failFirstRpcPortBindAttempt " +
+            "is true, port " + bindPort);
+        ++bindAttempts;
+        bindPort += portIncrementConstant;
+        continue;
+      }
+
       try {
         this.server =
             getRPCServer(
@@ -508,7 +520,7 @@ public abstract class BasicRPCCommunicat
         bindPort += portIncrementConstant;
       }
     }
-    if (bindAttempts == maxRpcPortBindAttempts) {
+    if (bindAttempts == maxRpcPortBindAttempts || this.server == null) {
       throw new IllegalStateException(
           "BasicRPCCommunications: Failed to start RPCServer with " +
               maxRpcPortBindAttempts + " attempts");

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1301962&r1=1301961&r2=1301962&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Mar 17 16:57:56 2012
@@ -138,10 +138,6 @@ public class BspServiceWorker<I extends 
     throws IOException, InterruptedException {
     super(serverPortList, sessionMsecTimeout, context, graphMapper);
     registerBspEvent(partitionExchangeChildrenChanged);
-    int finalRpcPort =
-        getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT,
-            GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
-            getTaskPartition();
     maxVerticesPerPartition =
         getConfiguration().getInt(
             GiraphJob.MAX_VERTICES_PER_PARTITION,
@@ -150,12 +146,13 @@ public class BspServiceWorker<I extends 
         getConfiguration().getLong(
             GiraphJob.INPUT_SPLIT_MAX_VERTICES,
             GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
-    workerInfo =
-        new WorkerInfo(getHostname(), getTaskPartition(), finalRpcPort);
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
-    commService = new RPCCommunications<I, V, E, M>(
-        context, this, graphState);
+    RPCCommunications<I, V, E, M> rpcCommService =
+        new RPCCommunications<I, V, E, M>(context, this, graphState);
+    workerInfo = new WorkerInfo(
+        getHostname(), getTaskPartition(), rpcCommService.getPort());
+    commService = rpcCommService;
     graphState.setWorkerCommunications(commService);
     this.workerContext =
         BspUtils.createWorkerContext(getConfiguration(),

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1301962&r1=1301961&r2=1301962&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Sat Mar 17 16:57:56 2012
@@ -160,6 +160,14 @@ public class GiraphJob extends Job {
       "giraph.maxRpcPortBindAttempts";
   /** Default maximum bind attempts for different RPC ports */
   public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+  /**
+   * Fail first RPC port binding attempt, simulate binding failure
+   * on real grid testing
+   */
+  public static final String FAIL_FIRST_RPC_PORT_BIND_ATTEMPT =
+      "giraph.failFirstRpcPortBindAttempt";
+  /** Default fail first RPC port binding attempt flag */
+  public static final boolean FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT = false;
 
   /** Maximum number of RPC handlers */
   public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";

Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java?rev=1301962&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java Sat Mar 17 16:57:56 2012
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.utils.InternalVertexRunner;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+
+/**
+ *  Tests for {@link TryMultiRpcBindingPortsTest}
+ */
+public class TryMultiRpcBindingPortsTest extends TestCase {
+
+    /**
+     * A local integration test on toy data
+     */
+    public void testToyData() throws Exception {
+
+        // a small graph with three components
+        String[] graph = new String[] {
+                "1 2 3",
+                "2 1 4 5",
+                "3 1 4",
+                "4 2 3 5 13",
+                "5 2 4 12 13",
+                "12 5 13",
+                "13 4 5 12",
+
+                "6 7 8",
+                "7 6 10 11",
+                "8 6 10",
+                "10 7 8 11",
+                "11 7 10",
+
+                "9" };
+
+        // run internally
+        // fail the first port binding attempt
+        Map<String, String> params = Maps.<String, String>newHashMap();
+        params.put(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT, "true");
+        Iterable<String> results = InternalVertexRunner.run(
+                ConnectedComponentsVertex.class,
+                MinimumIntCombiner.class,
+                IntIntNullIntTextInputFormat.class,
+                VertexWithComponentTextOutputFormat.class,
+                params, 
+                graph);
+
+        SetMultimap<Integer,Integer> components = parseResults(results);
+
+        Set<Integer> componentIDs = components.keySet();
+        assertEquals(3, componentIDs.size());
+        assertTrue(componentIDs.contains(1));
+        assertTrue(componentIDs.contains(6));
+        assertTrue(componentIDs.contains(9));
+
+        Set<Integer> componentOne = components.get(1);
+        assertEquals(7, componentOne.size());
+        assertTrue(componentOne.contains(1));
+        assertTrue(componentOne.contains(2));
+        assertTrue(componentOne.contains(3));
+        assertTrue(componentOne.contains(4));
+        assertTrue(componentOne.contains(5));
+        assertTrue(componentOne.contains(12));
+        assertTrue(componentOne.contains(13));
+
+        Set<Integer> componentTwo = components.get(6);
+        assertEquals(5, componentTwo.size());
+        assertTrue(componentTwo.contains(6));
+        assertTrue(componentTwo.contains(7));
+        assertTrue(componentTwo.contains(8));
+        assertTrue(componentTwo.contains(10));
+        assertTrue(componentTwo.contains(11));
+
+        Set<Integer> componentThree = components.get(9);
+        assertEquals(1, componentThree.size());
+        assertTrue(componentThree.contains(9));
+    }
+
+    private SetMultimap<Integer,Integer> parseResults(
+            Iterable<String> results) {
+        SetMultimap<Integer,Integer> components = HashMultimap.create();
+        for (String result : results) {
+            Iterable<String> parts = Splitter.on('\t').split(result);
+            int vertex = Integer.parseInt(Iterables.get(parts, 0));
+            int component = Integer.parseInt(Iterables.get(parts, 1));
+            components.put(component, vertex);
+        }
+        return components;
+    }
+}