You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/03/17 17:57:57 UTC
svn commit: r1301962 - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/
src/test/java/org/apache/giraph/examples/
Author: aching
Date: Sat Mar 17 16:57:56 2012
New Revision: 1301962
URL: http://svn.apache.org/viewvc?rev=1301962&view=rev
Log:
GIRAPH-154: Worker ports are not synched properly with its peers
(Zhiwei Gu via aching).
Added:
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1301962&r1=1301961&r2=1301962&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Sat Mar 17 16:57:56 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-154: Worker ports are not synched properly with its peers
+ (Zhiwei Gu via aching).
+
GIRAPH-87: Simplify boolean expression in
BspService::checkpointFrequencyMet (Eli Reisman via aching).
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1301962&r1=1301961&r2=1301962&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Mar 17 16:57:56 2012
@@ -494,8 +494,20 @@ public abstract class BasicRPCCommunicat
final int maxRpcPortBindAttempts =
conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+ final boolean failFirstPortBindingAttempt =
+ conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
+ GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
while (bindAttempts < maxRpcPortBindAttempts) {
this.myAddress = new InetSocketAddress(bindAddress, bindPort);
+ if (failFirstPortBindingAttempt && bindAttempts == 0) {
+ LOG.info("BasicRPCCommunications: Intentionally fail first " +
+ "binding attempt as giraph.failFirstRpcPortBindAttempt " +
+ "is true, port " + bindPort);
+ ++bindAttempts;
+ bindPort += portIncrementConstant;
+ continue;
+ }
+
try {
this.server =
getRPCServer(
@@ -508,7 +520,7 @@ public abstract class BasicRPCCommunicat
bindPort += portIncrementConstant;
}
}
- if (bindAttempts == maxRpcPortBindAttempts) {
+ if (bindAttempts == maxRpcPortBindAttempts || this.server == null) {
throw new IllegalStateException(
"BasicRPCCommunications: Failed to start RPCServer with " +
maxRpcPortBindAttempts + " attempts");
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1301962&r1=1301961&r2=1301962&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Mar 17 16:57:56 2012
@@ -138,10 +138,6 @@ public class BspServiceWorker<I extends
throws IOException, InterruptedException {
super(serverPortList, sessionMsecTimeout, context, graphMapper);
registerBspEvent(partitionExchangeChildrenChanged);
- int finalRpcPort =
- getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT,
- GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
- getTaskPartition();
maxVerticesPerPartition =
getConfiguration().getInt(
GiraphJob.MAX_VERTICES_PER_PARTITION,
@@ -150,12 +146,13 @@ public class BspServiceWorker<I extends
getConfiguration().getLong(
GiraphJob.INPUT_SPLIT_MAX_VERTICES,
GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
- workerInfo =
- new WorkerInfo(getHostname(), getTaskPartition(), finalRpcPort);
workerGraphPartitioner =
getGraphPartitionerFactory().createWorkerGraphPartitioner();
- commService = new RPCCommunications<I, V, E, M>(
- context, this, graphState);
+ RPCCommunications<I, V, E, M> rpcCommService =
+ new RPCCommunications<I, V, E, M>(context, this, graphState);
+ workerInfo = new WorkerInfo(
+ getHostname(), getTaskPartition(), rpcCommService.getPort());
+ commService = rpcCommService;
graphState.setWorkerCommunications(commService);
this.workerContext =
BspUtils.createWorkerContext(getConfiguration(),
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1301962&r1=1301961&r2=1301962&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Sat Mar 17 16:57:56 2012
@@ -160,6 +160,14 @@ public class GiraphJob extends Job {
"giraph.maxRpcPortBindAttempts";
/** Default maximum bind attempts for different RPC ports */
public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+ /**
+ * Fail first RPC port binding attempt, simulate binding failure
+ * on real grid testing
+ */
+ public static final String FAIL_FIRST_RPC_PORT_BIND_ATTEMPT =
+ "giraph.failFirstRpcPortBindAttempt";
+ /** Default fail first RPC port binding attempt flag */
+ public static final boolean FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT = false;
/** Maximum number of RPC handlers */
public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";
Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java?rev=1301962&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java Sat Mar 17 16:57:56 2012
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.utils.InternalVertexRunner;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+
+/**
+ * Tests for {@link TryMultiRpcBindingPortsTest}
+ */
+public class TryMultiRpcBindingPortsTest extends TestCase {
+
+ /**
+ * A local integration test on toy data
+ */
+ public void testToyData() throws Exception {
+
+ // a small graph with three components
+ String[] graph = new String[] {
+ "1 2 3",
+ "2 1 4 5",
+ "3 1 4",
+ "4 2 3 5 13",
+ "5 2 4 12 13",
+ "12 5 13",
+ "13 4 5 12",
+
+ "6 7 8",
+ "7 6 10 11",
+ "8 6 10",
+ "10 7 8 11",
+ "11 7 10",
+
+ "9" };
+
+ // run internally
+ // fail the first port binding attempt
+ Map<String, String> params = Maps.<String, String>newHashMap();
+ params.put(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT, "true");
+ Iterable<String> results = InternalVertexRunner.run(
+ ConnectedComponentsVertex.class,
+ MinimumIntCombiner.class,
+ IntIntNullIntTextInputFormat.class,
+ VertexWithComponentTextOutputFormat.class,
+ params,
+ graph);
+
+ SetMultimap<Integer,Integer> components = parseResults(results);
+
+ Set<Integer> componentIDs = components.keySet();
+ assertEquals(3, componentIDs.size());
+ assertTrue(componentIDs.contains(1));
+ assertTrue(componentIDs.contains(6));
+ assertTrue(componentIDs.contains(9));
+
+ Set<Integer> componentOne = components.get(1);
+ assertEquals(7, componentOne.size());
+ assertTrue(componentOne.contains(1));
+ assertTrue(componentOne.contains(2));
+ assertTrue(componentOne.contains(3));
+ assertTrue(componentOne.contains(4));
+ assertTrue(componentOne.contains(5));
+ assertTrue(componentOne.contains(12));
+ assertTrue(componentOne.contains(13));
+
+ Set<Integer> componentTwo = components.get(6);
+ assertEquals(5, componentTwo.size());
+ assertTrue(componentTwo.contains(6));
+ assertTrue(componentTwo.contains(7));
+ assertTrue(componentTwo.contains(8));
+ assertTrue(componentTwo.contains(10));
+ assertTrue(componentTwo.contains(11));
+
+ Set<Integer> componentThree = components.get(9);
+ assertEquals(1, componentThree.size());
+ assertTrue(componentThree.contains(9));
+ }
+
+ private SetMultimap<Integer,Integer> parseResults(
+ Iterable<String> results) {
+ SetMultimap<Integer,Integer> components = HashMultimap.create();
+ for (String result : results) {
+ Iterable<String> parts = Splitter.on('\t').split(result);
+ int vertex = Integer.parseInt(Iterables.get(parts, 0));
+ int component = Integer.parseInt(Iterables.get(parts, 1));
+ components.put(component, vertex);
+ }
+ return components;
+ }
+}