You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:21:53 UTC

[10/47] git commit: updated refs/heads/release-1.1 to 4c139ee

GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7f9218ae
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7f9218ae
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7f9218ae

Branch: refs/heads/release-1.1
Commit: 7f9218aeb6410929ddada81b4fabb17bf8636a4c
Parents: 4223ccc
Author: Pavan Kumar <pa...@fb.com>
Authored: Tue Jul 8 10:58:09 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Tue Jul 8 10:59:50 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../giraph/utils/InternalVertexRunner.java      | 63 ++++++++++++++++----
 2 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/7f9218ae/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index ea2f911..834b45f 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka)
+
   GIRAPH-713: Provide an option to do request compression (pavanka)
 
   GIRAPH-923: Upgrade Netty version to a latest stable one (pavanka)

http://git-wip-us.apache.org/repos/asf/giraph/blob/7f9218ae/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 09dd46d..2c4606f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -38,6 +38,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -55,8 +56,10 @@ import java.util.concurrent.TimeUnit;
  */
 @SuppressWarnings("unchecked")
 public class InternalVertexRunner {
-  /** ZooKeeper port to use for tests */
-  public static final int LOCAL_ZOOKEEPER_PORT = 22182;
+  /** Range of ZooKeeper ports to use for tests */
+  public static final int LOCAL_ZOOKEEPER_PORT_FROM = 22182;
+  /** Range of ZooKeeper ports to use for tests */
+  public static final int LOCAL_ZOOKEEPER_PORT_TO = 65535;
 
   /** Logger */
   private static final Logger LOG =
@@ -166,11 +169,13 @@ public class InternalVertexRunner {
         FileUtils.writeLines(edgeInputFile, edgeInputData);
       }
 
+      int localZookeeperPort = findAvailablePort();
+
       conf.setWorkerConfiguration(1, 1, 100.0f);
       GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
       GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
       conf.setZookeeperList("localhost:" +
-        String.valueOf(LOCAL_ZOOKEEPER_PORT));
+          String.valueOf(localZookeeperPort));
 
       conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
       GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
@@ -190,10 +195,10 @@ public class InternalVertexRunner {
             new Path(edgeInputFile.toString()));
       }
       FileOutputFormat.setOutputPath(job.getInternalJob(),
-                                     new Path(outputDir.toString()));
+          new Path(outputDir.toString()));
 
       // Configure a local zookeeper instance
-      Properties zkProperties = configLocalZooKeeper(zkDir);
+      Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
 
       QuorumPeerConfig qpConfig = new QuorumPeerConfig();
       qpConfig.parseProperties(zkProperties);
@@ -227,8 +232,8 @@ public class InternalVertexRunner {
    * @throws Exception if anything goes wrong
    */
   public static <I extends WritableComparable,
-    V extends Writable,
-    E extends Writable> void run(
+      V extends Writable,
+      E extends Writable> void run(
       GiraphConfiguration conf,
       TestGraph<I, V, E> graph) throws Exception {
     File tmpDir = null;
@@ -247,11 +252,13 @@ public class InternalVertexRunner {
 
       InMemoryVertexInputFormat.setGraph(graph);
 
+      int localZookeeperPort = findAvailablePort();
+
       conf.setWorkerConfiguration(1, 1, 100.0f);
       GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
       GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
       GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" +
-          String.valueOf(LOCAL_ZOOKEEPER_PORT));
+          String.valueOf(localZookeeperPort));
 
       conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
       GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
@@ -259,7 +266,7 @@ public class InternalVertexRunner {
       GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
 
       // Configure a local zookeeper instance
-      Properties zkProperties = configLocalZooKeeper(zkDir);
+      Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
 
       QuorumPeerConfig qpConfig = new QuorumPeerConfig();
       qpConfig.parseProperties(zkProperties);
@@ -298,14 +305,16 @@ public class InternalVertexRunner {
    * Configuration options for running local ZK.
    *
    * @param zkDir directory for ZK to hold files in.
+   * @param zookeeperPort port zookeeper will listen on
    * @return Properties configured for local ZK.
    */
-  private static Properties configLocalZooKeeper(File zkDir) {
+  private static Properties configLocalZooKeeper(File zkDir,
+                                                 int zookeeperPort) {
     Properties zkProperties = new Properties();
     zkProperties.setProperty("tickTime", "2000");
     zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
     zkProperties.setProperty("clientPort",
-        String.valueOf(LOCAL_ZOOKEEPER_PORT));
+        String.valueOf(zookeeperPort));
     zkProperties.setProperty("maxClientCnxns", "10000");
     zkProperties.setProperty("minSessionTimeout", "10000");
     zkProperties.setProperty("maxSessionTimeout", "100000");
@@ -316,6 +325,38 @@ public class InternalVertexRunner {
   }
 
   /**
+   * Scans for available port. Returns first port where
+   * we can open server socket.
+   * Note: if another process opened port with SO_REUSEPORT then this
+   * function may return port that is in use. It actually happens
+   * with NetCat on Mac.
+   * @return available port
+   */
+  private static int findAvailablePort() {
+    for (int port = LOCAL_ZOOKEEPER_PORT_FROM;
+         port < LOCAL_ZOOKEEPER_PORT_TO; port++) {
+      ServerSocket ss = null;
+      try {
+        ss = new ServerSocket(port);
+        ss.setReuseAddress(true);
+        return port;
+      } catch (IOException e) {
+        LOG.info("findAvailablePort: port " + port + " is in use.");
+      } finally {
+        if (ss != null && !ss.isClosed()) {
+          try {
+            ss.close();
+          } catch (IOException e) {
+            LOG.info("findAvailablePort: can't close test socket", e);
+          }
+        }
+      }
+    }
+    throw new RuntimeException("No port found in the range [ " +
+        LOCAL_ZOOKEEPER_PORT_FROM + ", " + LOCAL_ZOOKEEPER_PORT_TO + ")");
+  }
+
+  /**
    * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
    */
   private static class InternalZooKeeper extends ZooKeeperServerMain {