You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/11/06 14:53:27 UTC

git commit: updated refs/heads/trunk to e5a21c4

Updated Branches:
  refs/heads/trunk 3a20c5597 -> e5a21c4bb


GIRAPH-755: Make ZooKeeper port list available to input/output format (armax00 via claudio)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e5a21c4b
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e5a21c4b
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e5a21c4b

Branch: refs/heads/trunk
Commit: e5a21c4bb27055cad85f1d9dd494867c0fae8c22
Parents: 3a20c55
Author: Claudio Martella <cl...@gmail.com>
Authored: Wed Nov 6 14:52:45 2013 +0100
Committer: Claudio Martella <cl...@gmail.com>
Committed: Wed Nov 6 14:52:45 2013 +0100

----------------------------------------------------------------------
 CHANGELOG                                       |  2 ++
 .../java/org/apache/giraph/bsp/BspService.java  |  5 ++--
 .../apache/giraph/conf/GiraphConfiguration.java | 21 ++++++++++++++
 .../org/apache/giraph/conf/GiraphConstants.java | 16 ++++++++++-
 .../apache/giraph/graph/GraphTaskManager.java   | 29 ++++++++------------
 .../apache/giraph/master/BspServiceMaster.java  |  6 ++--
 .../apache/giraph/worker/BspServiceWorker.java  |  4 +--
 .../apache/giraph/yarn/GiraphYarnClient.java    |  6 ++--
 8 files changed, 58 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 04656f0..cd04a2b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-755: Make ZooKeeper port list available to input/output format (armax00 via claudio)
+
   GIRAPH-737: Giraph Application Master: Move to new and stable YARN API (mislam via ereisman)
 
   GIRAPH-791: HiveGiraphRunner picks -D options too late (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 34f4b51..86823ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -253,13 +253,11 @@ public abstract class BspService<I extends WritableComparable,
   /**
    * Constructor.
    *
-   * @param serverPortList ZooKeeper server port list
    * @param sessionMsecTimeout ZooKeeper session timeount in milliseconds
    * @param context Mapper context
    * @param graphTaskManager GraphTaskManager for this compute node
    */
-  public BspService(String serverPortList,
-      int sessionMsecTimeout,
+  public BspService(int sessionMsecTimeout,
       Mapper<?, ?, ?, ?>.Context context,
       GraphTaskManager<I, V, E> graphTaskManager) {
     this.vertexInputSplitsEvents = new InputSplitEvents(context);
@@ -322,6 +320,7 @@ public abstract class BspService<I extends WritableComparable,
         CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
             CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
     masterElectionPath = basePath + MASTER_ELECTION_DIR;
+    String serverPortList = conf.getZookeeperList();
     haltComputationPath = basePath + HALT_COMPUTATION_NODE;
     getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
         haltComputationPath);

http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index f176bfe..d066513 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -731,6 +731,27 @@ public class GiraphConfiguration extends Configuration
     return get(ZOOKEEPER_LIST);
   }
 
+  /**
+   * Set the ZooKeeper list to the provided list. This method is used when the
+   * ZooKeeper is started internally and will set the zkIsExternal option to
+   * false as well.
+   *
+   * @param zkList list of strings, comma separated of zookeeper servers
+   */
+  public void setZookeeperList(String zkList) {
+    set(ZOOKEEPER_LIST, zkList);
+    ZOOKEEPER_IS_EXTERNAL.set(this, false);
+  }
+
+  /**
+   * Was ZooKeeper provided externally?
+   *
+   * @return true iff was zookeeper is external
+   */
+  public boolean isZookeeperExternal() {
+    return ZOOKEEPER_IS_EXTERNAL.get(this);
+  }
+
   public String getLocalLevel() {
     return LOG_LEVEL.get(this);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 6f32e46..3f379f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -404,10 +404,24 @@ public interface GiraphConstants {
 
   /**
    *  ZooKeeper comma-separated list (if not set,
-   *  will start up ZooKeeper locally)
+   *  will start up ZooKeeper locally). Consider that after locally-starting
+   *  zookeeper, this parameter will updated the configuration with the corrent
+   *  configuration value.
    */
   String ZOOKEEPER_LIST = "giraph.zkList";
 
+  /**
+   * Zookeeper List will always hold a value during the computation while
+   * this option provides information regarding whether the zookeeper was
+   * internally started or externally provided.
+   */
+  BooleanConfOption ZOOKEEPER_IS_EXTERNAL =
+    new BooleanConfOption("giraph.zkIsExternal", true,
+                          "Zookeeper List will always hold a value during " +
+                          "the computation while this option provides " +
+                          "information regarding whether the zookeeper was " +
+                          "internally started or externally provided.");
+
   /** ZooKeeper session millisecond timeout */
   IntConfOption ZOOKEEPER_SESSION_TIMEOUT =
       new IntConfOption("giraph.zkSessionMsecTimeout", MINUTES.toMillis(1),

http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 3939d49..f31d99e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -144,8 +144,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   private GiraphTimerContext communicationTimerContext;
   /** Timer for WorkerContext#preSuperstep() */
   private GiraphTimer wcPreSuperstepTimer;
-  /** Zookeeper host:port list */
-  private String serverPortList;
   /** The Hadoop Mapper#Context for this job */
   private Mapper<?, ?, ?, ?>.Context context;
   /** is this GraphTaskManager the master? */
@@ -200,7 +198,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     // Do some task setup (possibly starting up a Zookeeper service)
     context.setStatus("setup: Initializing Zookeeper services.");
     locateZookeeperClasspath(zkPathList);
-    serverPortList = conf.getZookeeperList();
+    String serverPortList = conf.getZookeeperList();
     if (serverPortList == null && startZooKeeperManager()) {
       return; // ZK connect/startup failed
     }
@@ -219,7 +217,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     }
     int sessionMsecTimeout = conf.getZooKeeperSessionTimeout();
     try {
-      instantiateBspService(serverPortList, sessionMsecTimeout);
+      instantiateBspService(sessionMsecTimeout);
     } catch (IOException e) {
       LOG.error("setup: Caught exception just before end of setup", e);
       if (zkManager != null) {
@@ -369,7 +367,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       return true;
     }
     zkManager.onlineZooKeeperServers();
-    serverPortList = zkManager.getZooKeeperServerPortString();
+    String serverPortList = zkManager.getZooKeeperServerPortString();
+    conf.setZookeeperList(serverPortList);
     context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP,
         serverPortList);
     return false;
@@ -493,9 +492,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
    *    ZooKeeper.
    * 2) If split master/worker, masters also run ZooKeeper
    *
-   * 3) If split master/worker == true and <code>giraph.zkList</code> is set,
-   *    the master will not instantiate a ZK instance, but will assume
-   *    a quorum is already active on the cluster for Giraph to use.
+   * 3) If split master/worker == true and <code>giraph.zkList</code> is
+   *    externally provided, the master will not instantiate a ZK instance, but
+   *    will assume a quorum is already active on the cluster for Giraph to use.
    *
    * @param conf Configuration to use
    * @param zkManager ZooKeeper manager to help determine whether to run
@@ -507,7 +506,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       ZooKeeperManager zkManager) {
     boolean splitMasterWorker = conf.getSplitMasterWorker();
     int taskPartition = conf.getTaskPartition();
-    boolean zkAlreadyProvided = conf.getZookeeperList() != null;
+    boolean zkAlreadyProvided = conf.isZookeeperExternal();
     GraphFunctions functions = GraphFunctions.UNKNOWN;
     // What functions should this mapper do?
     if (!splitMasterWorker) {
@@ -538,18 +537,17 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   /**
    * Instantiate the appropriate BspService object (Master or Worker)
    * for this compute node.
-   * @param serverPortList host:port list for connecting to ZK quorum
    * @param sessionMsecTimeout configurable session timeout
    */
-  private void instantiateBspService(String serverPortList,
-    int sessionMsecTimeout) throws IOException, InterruptedException {
+  private void instantiateBspService(int sessionMsecTimeout)
+    throws IOException, InterruptedException {
     if (graphFunctions.isMaster()) {
       if (LOG.isInfoEnabled()) {
         LOG.info("setup: Starting up BspServiceMaster " +
           "(master thread)...");
       }
       serviceMaster = new BspServiceMaster<I, V, E>(
-        serverPortList, sessionMsecTimeout, context, this);
+        sessionMsecTimeout, context, this);
       masterThread = new MasterThread<I, V, E>(serviceMaster, context);
       masterThread.start();
     }
@@ -558,10 +556,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
         LOG.info("setup: Starting up BspServiceWorker...");
       }
       serviceWorker = new BspServiceWorker<I, V, E>(
-        serverPortList,
-        sessionMsecTimeout,
-        context,
-        this);
+        sessionMsecTimeout, context, this);
       if (LOG.isInfoEnabled()) {
         LOG.info("setup: Registering health of this worker...");
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index f043c61..baa8434 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -187,17 +187,15 @@ public class BspServiceMaster<I extends WritableComparable,
   /**
    * Constructor for setting up the master.
    *
-   * @param serverPortList ZooKeeper server port list
    * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
    * @param context Mapper context
    * @param graphTaskManager GraphTaskManager for this compute node
    */
   public BspServiceMaster(
-      String serverPortList,
       int sessionMsecTimeout,
       Mapper<?, ?, ?, ?>.Context context,
       GraphTaskManager<I, V, E> graphTaskManager) {
-    super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
+    super(sessionMsecTimeout, context, graphTaskManager);
     workerWroteCheckpoint = new PredicateLock(context);
     registerBspEvent(workerWroteCheckpoint);
     superstepStateChanged = new PredicateLock(context);
@@ -1768,7 +1766,7 @@ public class BspServiceMaster<I extends WritableComparable,
     // and the master can do any final cleanup if the ZooKeeper service was
     // provided (not dynamically started) and we don't want to keep the data
     try {
-      if (getConfiguration().getZookeeperList() != null &&
+      if (getConfiguration().isZookeeperExternal() &&
           KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
         if (LOG.isInfoEnabled()) {
           LOG.info("cleanupZooKeeper: Removing the following path " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index a92ddf8..f6da680 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -169,7 +169,6 @@ public class BspServiceWorker<I extends WritableComparable,
   /**
    * Constructor for setting up the worker.
    *
-   * @param serverPortList ZooKeeper server port list
    * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
    * @param context Mapper context
    * @param graphTaskManager GraphTaskManager for this compute node
@@ -177,12 +176,11 @@ public class BspServiceWorker<I extends WritableComparable,
    * @throws InterruptedException
    */
   public BspServiceWorker(
-    String serverPortList,
     int sessionMsecTimeout,
     Mapper<?, ?, ?, ?>.Context context,
     GraphTaskManager<I, V, E> graphTaskManager)
     throws IOException, InterruptedException {
-    super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
+    super(sessionMsecTimeout, context, graphTaskManager);
     ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);

http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
index ab6564e..e8926eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
@@ -455,11 +455,11 @@ public class GiraphYarnClient {
    * removed as we expand the functionality of the "pure YARN" Giraph profile.
    */
   private void checkJobLocalZooKeeperSupported() {
+    final boolean isZkExternal = giraphConf.isZookeeperExternal();
     final String checkZkList = giraphConf.getZookeeperList();
-    if (checkZkList == null || checkZkList.isEmpty()) {
+    if (!isZkExternal || checkZkList.isEmpty()) {
       throw new IllegalArgumentException("Giraph on YARN does not currently" +
-        "support Giraph-managed ZK instances: use a standalone ZooKeeper: '" +
-        checkZkList + "'");
+          "support Giraph-managed ZK instances: use a standalone ZooKeeper.");
     }
   }