You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:21:53 UTC
[10/47] git commit: updated refs/heads/release-1.1 to 4c139ee
GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7f9218ae
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7f9218ae
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7f9218ae
Branch: refs/heads/release-1.1
Commit: 7f9218aeb6410929ddada81b4fabb17bf8636a4c
Parents: 4223ccc
Author: Pavan Kumar <pa...@fb.com>
Authored: Tue Jul 8 10:58:09 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Tue Jul 8 10:59:50 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../giraph/utils/InternalVertexRunner.java | 63 ++++++++++++++++----
2 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/7f9218ae/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index ea2f911..834b45f 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka)
+
GIRAPH-713: Provide an option to do request compression (pavanka)
GIRAPH-923: Upgrade Netty version to a latest stable one (pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/7f9218ae/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 09dd46d..2c4606f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -38,6 +38,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import java.io.File;
import java.io.IOException;
+import java.net.ServerSocket;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -55,8 +56,10 @@ import java.util.concurrent.TimeUnit;
*/
@SuppressWarnings("unchecked")
public class InternalVertexRunner {
- /** ZooKeeper port to use for tests */
- public static final int LOCAL_ZOOKEEPER_PORT = 22182;
+ /** Range of ZooKeeper ports to use for tests */
+ public static final int LOCAL_ZOOKEEPER_PORT_FROM = 22182;
+ /** Range of ZooKeeper ports to use for tests */
+ public static final int LOCAL_ZOOKEEPER_PORT_TO = 65535;
/** Logger */
private static final Logger LOG =
@@ -166,11 +169,13 @@ public class InternalVertexRunner {
FileUtils.writeLines(edgeInputFile, edgeInputData);
}
+ int localZookeeperPort = findAvailablePort();
+
conf.setWorkerConfiguration(1, 1, 100.0f);
GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
conf.setZookeeperList("localhost:" +
- String.valueOf(LOCAL_ZOOKEEPER_PORT));
+ String.valueOf(localZookeeperPort));
conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
@@ -190,10 +195,10 @@ public class InternalVertexRunner {
new Path(edgeInputFile.toString()));
}
FileOutputFormat.setOutputPath(job.getInternalJob(),
- new Path(outputDir.toString()));
+ new Path(outputDir.toString()));
// Configure a local zookeeper instance
- Properties zkProperties = configLocalZooKeeper(zkDir);
+ Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
QuorumPeerConfig qpConfig = new QuorumPeerConfig();
qpConfig.parseProperties(zkProperties);
@@ -227,8 +232,8 @@ public class InternalVertexRunner {
* @throws Exception if anything goes wrong
*/
public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable> void run(
+ V extends Writable,
+ E extends Writable> void run(
GiraphConfiguration conf,
TestGraph<I, V, E> graph) throws Exception {
File tmpDir = null;
@@ -247,11 +252,13 @@ public class InternalVertexRunner {
InMemoryVertexInputFormat.setGraph(graph);
+ int localZookeeperPort = findAvailablePort();
+
conf.setWorkerConfiguration(1, 1, 100.0f);
GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" +
- String.valueOf(LOCAL_ZOOKEEPER_PORT));
+ String.valueOf(localZookeeperPort));
conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
@@ -259,7 +266,7 @@ public class InternalVertexRunner {
GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
// Configure a local zookeeper instance
- Properties zkProperties = configLocalZooKeeper(zkDir);
+ Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
QuorumPeerConfig qpConfig = new QuorumPeerConfig();
qpConfig.parseProperties(zkProperties);
@@ -298,14 +305,16 @@ public class InternalVertexRunner {
* Configuration options for running local ZK.
*
* @param zkDir directory for ZK to hold files in.
+ * @param zookeeperPort port zookeeper will listen on
* @return Properties configured for local ZK.
*/
- private static Properties configLocalZooKeeper(File zkDir) {
+ private static Properties configLocalZooKeeper(File zkDir,
+ int zookeeperPort) {
Properties zkProperties = new Properties();
zkProperties.setProperty("tickTime", "2000");
zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
zkProperties.setProperty("clientPort",
- String.valueOf(LOCAL_ZOOKEEPER_PORT));
+ String.valueOf(zookeeperPort));
zkProperties.setProperty("maxClientCnxns", "10000");
zkProperties.setProperty("minSessionTimeout", "10000");
zkProperties.setProperty("maxSessionTimeout", "100000");
@@ -316,6 +325,38 @@ public class InternalVertexRunner {
}
/**
+ * Scans for available port. Returns first port where
+ * we can open server socket.
+ * Note: if another process opened port with SO_REUSEPORT then this
+ * function may return port that is in use. It actually happens
+ * with NetCat on Mac.
+ * @return available port
+ */
+ private static int findAvailablePort() {
+ for (int port = LOCAL_ZOOKEEPER_PORT_FROM;
+ port < LOCAL_ZOOKEEPER_PORT_TO; port++) {
+ ServerSocket ss = null;
+ try {
+ ss = new ServerSocket(port);
+ ss.setReuseAddress(true);
+ return port;
+ } catch (IOException e) {
+ LOG.info("findAvailablePort: port " + port + " is in use.");
+ } finally {
+ if (ss != null && !ss.isClosed()) {
+ try {
+ ss.close();
+ } catch (IOException e) {
+ LOG.info("findAvailablePort: can't close test socket", e);
+ }
+ }
+ }
+ }
+ throw new RuntimeException("No port found in the range [ " +
+ LOCAL_ZOOKEEPER_PORT_FROM + ", " + LOCAL_ZOOKEEPER_PORT_TO + ")");
+ }
+
+ /**
* Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
*/
private static class InternalZooKeeper extends ZooKeeperServerMain {