You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/09/16 20:17:11 UTC

git commit: updated refs/heads/trunk to 7d4b725

Updated Branches:
  refs/heads/trunk af21be3b7 -> 7d4b72561


GIRAPH-756: Provide a way to halt running application (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7d4b7256
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7d4b7256
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7d4b7256

Branch: refs/heads/trunk
Commit: 7d4b725615916d26610afb2bc2ec71a774ed8cfb
Parents: af21be3
Author: Maja Kabiljo <ma...@fb.com>
Authored: Mon Sep 16 11:11:28 2013 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Sep 16 11:12:12 2013 -0700

----------------------------------------------------------------------
 bin/giraph-env                                  |  13 +-
 bin/halt-application                            |  22 ++++
 .../java/org/apache/giraph/bsp/BspService.java  |  10 ++
 .../org/apache/giraph/conf/GiraphConstants.java |  23 ++++
 .../apache/giraph/graph/GraphTaskManager.java   |   2 +
 .../apache/giraph/job/DefaultJobObserver.java   |   5 +
 .../java/org/apache/giraph/job/GiraphJob.java   |   7 ++
 .../apache/giraph/job/GiraphJobObserver.java    |   7 ++
 .../apache/giraph/job/HaltApplicationUtils.java | 122 +++++++++++++++++++
 .../apache/giraph/master/BspServiceMaster.java  |   5 +
 .../apache/giraph/zk/ZooKeeperNodeCreator.java  |  87 +++++++++++++
 11 files changed, 296 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/bin/giraph-env
----------------------------------------------------------------------
diff --git a/bin/giraph-env b/bin/giraph-env
index 9aa78bb..34ff91f 100644
--- a/bin/giraph-env
+++ b/bin/giraph-env
@@ -41,13 +41,12 @@ USER_JAR=$1
 shift
 
 if [ ! -e "$USER_JAR" ]; then
-  echo "Can't find user jar (${USER_JAR}) to execute."
-  exit 1
+  echo "No user jar found at $USER_JAR"
+else
+  # add user jar to classpath
+  CLASSPATH=${USER_JAR}
 fi
 
-# add user jar to classpath
-CLASSPATH=${USER_JAR}
-
 # add Giraph conf dir to classpath
 CLASSPATH=$CLASSPATH:$GIRAPH_HOME/conf
 
@@ -71,7 +70,7 @@ if [ -d "$GIRAPH_HOME/lib" ]; then
 	done
 else
 	echo "No lib directory, assuming dev environment"
-	if [ ! -d "$GIRAPH_HOME/target" ]; then
+	if [ ! -d "$GIRAPH_HOME/giraph-core/target" ]; then
 		echo "No target directory. Build Giraph jar before proceeding."
 		exit 1
 	fi
@@ -79,7 +78,7 @@ else
 	CLASSPATH2=`mvn dependency:build-classpath | grep -v "[INFO]"`
 	CLASSPATH=$CLASSPATH:$CLASSPATH2	
 
-	for f in $GIRAPH_HOME/giraph/target/giraph*.jar; do
+	for f in $GIRAPH_HOME/giraph-core/target/giraph*.jar; do
 	  if [ -e "$f" ]; then
 	    JAR=$f
 	    break

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/bin/halt-application
----------------------------------------------------------------------
diff --git a/bin/halt-application b/bin/halt-application
new file mode 100644
index 0000000..0526661
--- /dev/null
+++ b/bin/halt-application
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+GIRAPH_ENV_DIR="$( cd -P "$( dirname "$0" )" && pwd )"
+source $GIRAPH_ENV_DIR/giraph-env
+
+CLASS=org.apache.giraph.zk.ZooKeeperNodeCreator
+exec "$HADOOP_HOME/bin/hadoop" --config $HADOOP_CONF_DIR jar $JAR $CLASS $HADOOP_PROPERTIES -libjars $GIRAPH_JARS  "$@"

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index aae01da..34f4b51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -137,6 +137,8 @@ public abstract class BspService<I extends WritableComparable,
       "/_partitionExchangeDir";
   /** Denotes that the superstep is done */
   public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
+  /** Denotes that computation should be halted */
+  public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
   /** Denotes which workers have been cleaned up */
   public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
   /** JSON partition stats key */
@@ -200,6 +202,8 @@ public abstract class BspService<I extends WritableComparable,
   protected final String checkpointBasePath;
   /** Path to the master election path */
   protected final String masterElectionPath;
+  /** If this path exists computation will be halted */
+  protected final String haltComputationPath;
   /** Private ZooKeeper instance that implements the service */
   private final ZooKeeperExt zk;
   /** Has the Connection occurred? */
@@ -318,6 +322,12 @@ public abstract class BspService<I extends WritableComparable,
         CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
             CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
     masterElectionPath = basePath + MASTER_ELECTION_DIR;
+    haltComputationPath = basePath + HALT_COMPUTATION_NODE;
+    getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
+        haltComputationPath);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("BspService: Path to create to halt is " + haltComputationPath);
+    }
     if (LOG.isInfoEnabled()) {
       LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
           ", " + getTaskPartition() + " on " + serverPortList);

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 7c9b19a..4dadd29 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -47,6 +47,7 @@ import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.job.DefaultJobObserver;
 import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.job.HaltApplicationUtils;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
@@ -923,5 +924,27 @@ public interface GiraphConstants {
   BooleanConfOption ONE_TO_ALL_MSG_SENDING =
     new BooleanConfOption("giraph.oneToAllMsgSending", false, "Enable " +
         "one-to-all message sending strategy");
+
+  /**
+   * This counter group will contain one counter whose name is the ZooKeeper
+   * server:port which this job is using
+   */
+  String ZOOKEEPER_SERVER_PORT_COUNTER_GROUP = "Zookeeper server:port";
+
+  /**
+   * This counter group will contain one counter whose name is the ZooKeeper
+   * node path which should be created to trigger computation halt
+   */
+  String ZOOKEEPER_HALT_NODE_COUNTER_GROUP = "Zookeeper halt node";
+
+  /**
+   * Which class to use to write instructions on how to halt the application
+   */
+  ClassConfOption<HaltApplicationUtils.HaltInstructionsWriter>
+  HALT_INSTRUCTIONS_WRITER_CLASS = ClassConfOption.create(
+      "giraph.haltInstructionsWriter",
+      HaltApplicationUtils.DefaultHaltInstructionsWriter.class,
+      HaltApplicationUtils.HaltInstructionsWriter.class,
+      "Class used to write instructions on how to halt the application");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 704fb9e..3939d49 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -370,6 +370,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     }
     zkManager.onlineZooKeeperServers();
     serverPortList = zkManager.getZooKeeperServerPortString();
+    context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP,
+        serverPortList);
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
index 2e703ca..ca331b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
@@ -46,6 +46,11 @@ public class DefaultJobObserver implements GiraphJobObserver,
   }
 
   @Override
+  public void jobRunning(Job submittedJob) {
+    // do nothing
+  }
+
+  @Override
   public void jobFinished(Job jobToSubmit, boolean passed) {
     // do nothing
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 05b07a5..fca14ac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -242,6 +242,13 @@ public class GiraphJob {
 
     GiraphJobObserver jobObserver = conf.getJobObserver();
     jobObserver.launchingJob(submittedJob);
+    submittedJob.submit();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL());
+    }
+    HaltApplicationUtils.printHaltInfo(submittedJob, conf);
+    jobObserver.jobRunning(submittedJob);
+
     boolean passed = submittedJob.waitForCompletion(verbose);
     jobObserver.jobFinished(submittedJob, passed);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
index fbcc4f1..3905f77 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
@@ -31,6 +31,13 @@ public interface GiraphJobObserver {
   void launchingJob(Job jobToSubmit);
 
   /**
+   * Callback after job was submitted.
+   * For example, you can track its progress here.
+   * @param submittedJob Job which was submitted.
+   */
+  void jobRunning(Job submittedJob);
+
+  /**
    * Callback when job finishes.
    * @param submittedJob Job that ran in hadoop.
    * @param passed true if job succeeded.

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
new file mode 100644
index 0000000..28b5781
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Utility methods for halting application while running
+ */
+public class HaltApplicationUtils {
+  /** Milliseconds to sleep for while waiting for halt info */
+  private static final int SLEEP_MSECS = 100;
+
+  /** Do not instantiate */
+  private HaltApplicationUtils() { }
+
+  /**
+   * Wait for halt info (zk server and node) to become available
+   *
+   * @param submittedJob Submitted job
+   * @return True if halt info became available, false if job completed
+   * before it became available
+   */
+  private static boolean waitForHaltInfo(Job submittedJob) throws IOException {
+    try {
+      while (submittedJob.getCounters().getGroup(
+          GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).size() == 0) {
+        if (submittedJob.isComplete()) {
+          return false;
+        }
+        Thread.sleep(SLEEP_MSECS);
+      }
+      while (submittedJob.getCounters().getGroup(
+          GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).size() == 0) {
+        if (submittedJob.isComplete()) {
+          return false;
+        }
+        Thread.sleep(SLEEP_MSECS);
+      }
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "waitForHaltInfo: InterruptedException occurred", e);
+    }
+    return true;
+  }
+
+  /**
+   * Wait for halt info to become available and print instructions on how to
+   * halt
+   *
+   * @param submittedJob Submitted job
+   * @param conf Configuration
+   */
+  public static void printHaltInfo(Job submittedJob,
+      GiraphConfiguration conf) throws IOException {
+    if (waitForHaltInfo(submittedJob)) {
+      String zkServer = submittedJob.getCounters().getGroup(
+          GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).iterator()
+          .next().getName();
+      String haltNode =  submittedJob.getCounters().getGroup(
+          GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).iterator()
+          .next().getName();
+      GiraphConstants.HALT_INSTRUCTIONS_WRITER_CLASS.newInstance(conf)
+          .writeHaltInstructions(zkServer, haltNode);
+    }
+  }
+
+  /**
+   * Writer of instructions about how to halt
+   */
+  public interface HaltInstructionsWriter {
+    /**
+     * Write instructions about how to halt
+     *
+     * @param zkServer ZooKeeper server
+     * @param haltNode ZooKeeper node which should be created in order to halt
+     */
+    void writeHaltInstructions(String zkServer, String haltNode);
+  }
+
+  /**
+   * Default implementation of {@link HaltInstructionsWriter} - points to how
+   * to use {@link org.apache.giraph.zk.ZooKeeperNodeCreator} to halt
+   */
+  public static class DefaultHaltInstructionsWriter implements
+      HaltInstructionsWriter {
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(
+        DefaultHaltInstructionsWriter.class);
+
+    @Override
+    public void writeHaltInstructions(String zkServer, String haltNode) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("writeHaltInstructions: " +
+            "To halt after next superstep execute: " +
+            "'bin/halt-application --zkServer " + zkServer +
+            " --zkNode " + haltNode + "'");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 454c934..f043c61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -1634,6 +1634,11 @@ public class BspServiceMaster<I extends WritableComparable,
         globalStats.getVertexCount() &&
         globalStats.getMessageCount() == 0)) {
       globalStats.setHaltComputation(true);
+    } else if (getZkExt().exists(haltComputationPath, false) != null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Halting computation because halt zookeeper node was created");
+      }
+      globalStats.setHaltComputation(true);
     }
 
     // If we have completed the maximum number of supersteps, stop

http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java
new file mode 100644
index 0000000..2ffa80a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.zk;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+
+import static java.lang.System.out;
+
+/** A utility class to be used to create a ZooKeeper node */
+public class ZooKeeperNodeCreator implements Tool, Watcher {
+  /** The configuration */
+  private Configuration conf;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Options options = new Options();
+    options.addOption("zk", "zkServer", true,
+        "List of host:port ZooKeeper servers");
+    options.addOption("n", "zkNode", true,
+        "ZooKeeper node to create");
+
+    HelpFormatter formatter = new HelpFormatter();
+    if (args.length == 0) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+
+    ZooKeeperExt zkExt = new ZooKeeperExt(cmd.getOptionValue("zkServer"),
+        30 * 1000, 5, 1000, this);
+    zkExt.createExt(cmd.getOptionValue("zkNode"), new byte[0],
+        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
+    return 0;
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    out.println("process: ZK event received: " + event);
+  }
+
+  /**
+   * Entry point from shell script
+   * @param args the command line arguments
+   */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new ZooKeeperNodeCreator(), args));
+  }
+}