You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/09/16 20:17:11 UTC
git commit: updated refs/heads/trunk to 7d4b725
Updated Branches:
refs/heads/trunk af21be3b7 -> 7d4b72561
GIRAPH-756: Provide a way to halt running application (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7d4b7256
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7d4b7256
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7d4b7256
Branch: refs/heads/trunk
Commit: 7d4b725615916d26610afb2bc2ec71a774ed8cfb
Parents: af21be3
Author: Maja Kabiljo <ma...@fb.com>
Authored: Mon Sep 16 11:11:28 2013 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Sep 16 11:12:12 2013 -0700
----------------------------------------------------------------------
bin/giraph-env | 13 +-
bin/halt-application | 22 ++++
.../java/org/apache/giraph/bsp/BspService.java | 10 ++
.../org/apache/giraph/conf/GiraphConstants.java | 23 ++++
.../apache/giraph/graph/GraphTaskManager.java | 2 +
.../apache/giraph/job/DefaultJobObserver.java | 5 +
.../java/org/apache/giraph/job/GiraphJob.java | 7 ++
.../apache/giraph/job/GiraphJobObserver.java | 7 ++
.../apache/giraph/job/HaltApplicationUtils.java | 122 +++++++++++++++++++
.../apache/giraph/master/BspServiceMaster.java | 5 +
.../apache/giraph/zk/ZooKeeperNodeCreator.java | 87 +++++++++++++
11 files changed, 296 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/bin/giraph-env
----------------------------------------------------------------------
diff --git a/bin/giraph-env b/bin/giraph-env
index 9aa78bb..34ff91f 100644
--- a/bin/giraph-env
+++ b/bin/giraph-env
@@ -41,13 +41,12 @@ USER_JAR=$1
shift
if [ ! -e "$USER_JAR" ]; then
- echo "Can't find user jar (${USER_JAR}) to execute."
- exit 1
+ echo "No user jar found at $USER_JAR"
+else
+ # add user jar to classpath
+ CLASSPATH=${USER_JAR}
fi
-# add user jar to classpath
-CLASSPATH=${USER_JAR}
-
# add Giraph conf dir to classpath
CLASSPATH=$CLASSPATH:$GIRAPH_HOME/conf
@@ -71,7 +70,7 @@ if [ -d "$GIRAPH_HOME/lib" ]; then
done
else
echo "No lib directory, assuming dev environment"
- if [ ! -d "$GIRAPH_HOME/target" ]; then
+ if [ ! -d "$GIRAPH_HOME/giraph-core/target" ]; then
echo "No target directory. Build Giraph jar before proceeding."
exit 1
fi
@@ -79,7 +78,7 @@ else
CLASSPATH2=`mvn dependency:build-classpath | grep -v "[INFO]"`
CLASSPATH=$CLASSPATH:$CLASSPATH2
- for f in $GIRAPH_HOME/giraph/target/giraph*.jar; do
+ for f in $GIRAPH_HOME/giraph-core/target/giraph*.jar; do
if [ -e "$f" ]; then
JAR=$f
break
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/bin/halt-application
----------------------------------------------------------------------
diff --git a/bin/halt-application b/bin/halt-application
new file mode 100644
index 0000000..0526661
--- /dev/null
+++ b/bin/halt-application
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+GIRAPH_ENV_DIR="$( cd -P "$( dirname "$0" )" && pwd )"
+source $GIRAPH_ENV_DIR/giraph-env
+
+CLASS=org.apache.giraph.zk.ZooKeeperNodeCreator
+exec "$HADOOP_HOME/bin/hadoop" --config $HADOOP_CONF_DIR jar $JAR $CLASS $HADOOP_PROPERTIES -libjars $GIRAPH_JARS "$@"
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index aae01da..34f4b51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -137,6 +137,8 @@ public abstract class BspService<I extends WritableComparable,
"/_partitionExchangeDir";
/** Denotes that the superstep is done */
public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
+ /** Denotes that computation should be halted */
+ public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
/** Denotes which workers have been cleaned up */
public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
/** JSON partition stats key */
@@ -200,6 +202,8 @@ public abstract class BspService<I extends WritableComparable,
protected final String checkpointBasePath;
/** Path to the master election path */
protected final String masterElectionPath;
+ /** If this path exists computation will be halted */
+ protected final String haltComputationPath;
/** Private ZooKeeper instance that implements the service */
private final ZooKeeperExt zk;
/** Has the Connection occurred? */
@@ -318,6 +322,12 @@ public abstract class BspService<I extends WritableComparable,
CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
masterElectionPath = basePath + MASTER_ELECTION_DIR;
+ haltComputationPath = basePath + HALT_COMPUTATION_NODE;
+ getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
+ haltComputationPath);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("BspService: Path to create to halt is " + haltComputationPath);
+ }
if (LOG.isInfoEnabled()) {
LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
", " + getTaskPartition() + " on " + serverPortList);
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 7c9b19a..4dadd29 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -47,6 +47,7 @@ import org.apache.giraph.io.filters.EdgeInputFilter;
import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.job.DefaultJobObserver;
import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.job.HaltApplicationUtils;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
@@ -923,5 +924,27 @@ public interface GiraphConstants {
BooleanConfOption ONE_TO_ALL_MSG_SENDING =
new BooleanConfOption("giraph.oneToAllMsgSending", false, "Enable " +
"one-to-all message sending strategy");
+
+ /**
+ * This counter group will contain one counter whose name is the ZooKeeper
+ * server:port which this job is using
+ */
+ String ZOOKEEPER_SERVER_PORT_COUNTER_GROUP = "Zookeeper server:port";
+
+ /**
+ * This counter group will contain one counter whose name is the ZooKeeper
+ * node path which should be created to trigger computation halt
+ */
+ String ZOOKEEPER_HALT_NODE_COUNTER_GROUP = "Zookeeper halt node";
+
+ /**
+ * Which class to use to write instructions on how to halt the application
+ */
+ ClassConfOption<HaltApplicationUtils.HaltInstructionsWriter>
+ HALT_INSTRUCTIONS_WRITER_CLASS = ClassConfOption.create(
+ "giraph.haltInstructionsWriter",
+ HaltApplicationUtils.DefaultHaltInstructionsWriter.class,
+ HaltApplicationUtils.HaltInstructionsWriter.class,
+ "Class used to write instructions on how to halt the application");
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 704fb9e..3939d49 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -370,6 +370,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
}
zkManager.onlineZooKeeperServers();
serverPortList = zkManager.getZooKeeperServerPortString();
+ context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP,
+ serverPortList);
return false;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
index 2e703ca..ca331b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
@@ -46,6 +46,11 @@ public class DefaultJobObserver implements GiraphJobObserver,
}
@Override
+ public void jobRunning(Job submittedJob) {
+ // do nothing
+ }
+
+ @Override
public void jobFinished(Job jobToSubmit, boolean passed) {
// do nothing
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 05b07a5..fca14ac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -242,6 +242,13 @@ public class GiraphJob {
GiraphJobObserver jobObserver = conf.getJobObserver();
jobObserver.launchingJob(submittedJob);
+ submittedJob.submit();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL());
+ }
+ HaltApplicationUtils.printHaltInfo(submittedJob, conf);
+ jobObserver.jobRunning(submittedJob);
+
boolean passed = submittedJob.waitForCompletion(verbose);
jobObserver.jobFinished(submittedJob, passed);
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
index fbcc4f1..3905f77 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
@@ -31,6 +31,13 @@ public interface GiraphJobObserver {
void launchingJob(Job jobToSubmit);
/**
+ * Callback after job was submitted.
+ * For example, you can track its progress here.
+ * @param submittedJob Job which was submitted.
+ */
+ void jobRunning(Job submittedJob);
+
+ /**
* Callback when job finishes.
* @param submittedJob Job that ran in hadoop.
* @param passed true if job succeeded.
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
new file mode 100644
index 0000000..28b5781
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Utility methods for halting application while running
+ */
+public class HaltApplicationUtils {
+ /** Milliseconds to sleep for while waiting for halt info */
+ private static final int SLEEP_MSECS = 100;
+
+ /** Do not instantiate */
+ private HaltApplicationUtils() { }
+
+ /**
+ * Wait for halt info (zk server and node) to become available
+ *
+ * @param submittedJob Submitted job
+ * @return True if halt info became available, false if job completed
+ * before it became available
+ */
+ private static boolean waitForHaltInfo(Job submittedJob) throws IOException {
+ try {
+ while (submittedJob.getCounters().getGroup(
+ GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).size() == 0) {
+ if (submittedJob.isComplete()) {
+ return false;
+ }
+ Thread.sleep(SLEEP_MSECS);
+ }
+ while (submittedJob.getCounters().getGroup(
+ GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).size() == 0) {
+ if (submittedJob.isComplete()) {
+ return false;
+ }
+ Thread.sleep(SLEEP_MSECS);
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "waitForHaltInfo: InterruptedException occurred", e);
+ }
+ return true;
+ }
+
+ /**
+ * Wait for halt info to become available and print instructions on how to
+ * halt
+ *
+ * @param submittedJob Submitted job
+ * @param conf Configuration
+ */
+ public static void printHaltInfo(Job submittedJob,
+ GiraphConfiguration conf) throws IOException {
+ if (waitForHaltInfo(submittedJob)) {
+ String zkServer = submittedJob.getCounters().getGroup(
+ GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).iterator()
+ .next().getName();
+ String haltNode = submittedJob.getCounters().getGroup(
+ GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).iterator()
+ .next().getName();
+ GiraphConstants.HALT_INSTRUCTIONS_WRITER_CLASS.newInstance(conf)
+ .writeHaltInstructions(zkServer, haltNode);
+ }
+ }
+
+ /**
+ * Writer of instructions about how to halt
+ */
+ public interface HaltInstructionsWriter {
+ /**
+ * Write instructions about how to halt
+ *
+ * @param zkServer ZooKeeper server
+ * @param haltNode ZooKeeper node which should be created in order to halt
+ */
+ void writeHaltInstructions(String zkServer, String haltNode);
+ }
+
+ /**
+ * Default implementation of {@link HaltInstructionsWriter} - points to how
+ * to use {@link org.apache.giraph.zk.ZooKeeperNodeCreator} to halt
+ */
+ public static class DefaultHaltInstructionsWriter implements
+ HaltInstructionsWriter {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(
+ DefaultHaltInstructionsWriter.class);
+
+ @Override
+ public void writeHaltInstructions(String zkServer, String haltNode) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("writeHaltInstructions: " +
+ "To halt after next superstep execute: " +
+ "'bin/halt-application --zkServer " + zkServer +
+ " --zkNode " + haltNode + "'");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 454c934..f043c61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -1634,6 +1634,11 @@ public class BspServiceMaster<I extends WritableComparable,
globalStats.getVertexCount() &&
globalStats.getMessageCount() == 0)) {
globalStats.setHaltComputation(true);
+ } else if (getZkExt().exists(haltComputationPath, false) != null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Halting computation because halt zookeeper node was created");
+ }
+ globalStats.setHaltComputation(true);
}
// If we have completed the maximum number of supersteps, stop
http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java
new file mode 100644
index 0000000..2ffa80a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.zk;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+
+import static java.lang.System.out;
+
+/** A utility class to be used to create a ZooKeeper node */
+public class ZooKeeperNodeCreator implements Tool, Watcher {
+ /** The configuration */
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("zk", "zkServer", true,
+ "List of host:port ZooKeeper servers");
+ options.addOption("n", "zkNode", true,
+ "ZooKeeper node to create");
+
+ HelpFormatter formatter = new HelpFormatter();
+ if (args.length == 0) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ ZooKeeperExt zkExt = new ZooKeeperExt(cmd.getOptionValue("zkServer"),
+ 30 * 1000, 5, 1000, this);
+ zkExt.createExt(cmd.getOptionValue("zkNode"), new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
+ return 0;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ out.println("process: ZK event received: " + event);
+ }
+
+ /**
+ * Entry point from shell script
+ * @param args the command line arguments
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new ZooKeeperNodeCreator(), args));
+ }
+}