You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/01/28 20:56:39 UTC
git commit: updated refs/heads/trunk to 7cc5457
Updated Branches:
refs/heads/trunk d1a061e1a -> 7cc54575d
GIRAPH-792: Print job progress to command line (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7cc54575
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7cc54575
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7cc54575
Branch: refs/heads/trunk
Commit: 7cc54575d867e37a43020df309a78cd65c3fbdc0
Parents: d1a061e
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Jan 28 11:49:58 2014 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Jan 28 11:56:10 2014 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/bsp/BspService.java | 13 +-
.../apache/giraph/conf/GiraphConfiguration.java | 9 +
.../org/apache/giraph/conf/GiraphConstants.java | 11 +
.../apache/giraph/graph/ComputeCallable.java | 12 +
.../apache/giraph/graph/GraphTaskManager.java | 23 +-
.../giraph/job/CombinedWorkerProgress.java | 118 ++++++
.../java/org/apache/giraph/job/GiraphJob.java | 5 +
.../apache/giraph/job/HaltApplicationUtils.java | 56 +--
.../apache/giraph/job/JobProgressTracker.java | 149 ++++++++
.../apache/giraph/master/BspServiceMaster.java | 8 +-
.../org/apache/giraph/utils/CounterUtils.java | 57 +++
.../apache/giraph/worker/BspServiceWorker.java | 37 +-
.../giraph/worker/EdgeInputSplitsCallable.java | 5 +
.../worker/VertexInputSplitsCallable.java | 6 +
.../apache/giraph/worker/WorkerProgress.java | 369 +++++++++++++++++++
.../giraph/worker/WorkerProgressWriter.java | 74 ++++
17 files changed, 889 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7435a92..971ab46 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-792: Print job progress to command line (majakabiljo)
+
GIRAPH-831: waitUntilAllTasksDone waits forever (without debug information) (aching)
GIRAPH-830: directMemory used in netty message (pavanka via aching)
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 86823ed..ec0ddbb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -137,6 +137,8 @@ public abstract class BspService<I extends WritableComparable,
"/_partitionExchangeDir";
/** Denotes that the superstep is done */
public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
+ /** Stores progress info for workers */
+ public static final String WORKER_PROGRESSES = "/_workerProgresses";
/** Denotes that computation should be halted */
public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
/** Denotes which workers have been cleaned up */
@@ -202,6 +204,8 @@ public abstract class BspService<I extends WritableComparable,
protected final String checkpointBasePath;
/** Path to the master election path */
protected final String masterElectionPath;
+ /** Stores progress info of this worker */
+ protected final String myProgressPath;
/** If this path exists computation will be halted */
protected final String haltComputationPath;
/** Private ZooKeeper instance that implements the service */
@@ -253,11 +257,10 @@ public abstract class BspService<I extends WritableComparable,
/**
* Constructor.
*
- * @param sessionMsecTimeout ZooKeeper session timeount in milliseconds
* @param context Mapper context
* @param graphTaskManager GraphTaskManager for this compute node
*/
- public BspService(int sessionMsecTimeout,
+ public BspService(
Mapper<?, ?, ?, ?>.Context context,
GraphTaskManager<I, V, E> graphTaskManager) {
this.vertexInputSplitsEvents = new InputSplitEvents(context);
@@ -307,6 +310,8 @@ public abstract class BspService<I extends WritableComparable,
this.checkpointFrequency = conf.getCheckpointFrequency();
basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
+ getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
+ basePath);
masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
vertexInputSplitsPaths = new InputSplitPaths(basePath,
VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
@@ -320,6 +325,7 @@ public abstract class BspService<I extends WritableComparable,
CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
masterElectionPath = basePath + MASTER_ELECTION_DIR;
+ myProgressPath = basePath + WORKER_PROGRESSES + "/" + taskPartition;
String serverPortList = conf.getZookeeperList();
haltComputationPath = basePath + HALT_COMPUTATION_NODE;
getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
@@ -333,7 +339,7 @@ public abstract class BspService<I extends WritableComparable,
}
try {
this.zk = new ZooKeeperExt(serverPortList,
- sessionMsecTimeout,
+ conf.getZooKeeperSessionTimeout(),
conf.getZookeeperOpsMaxAttempts(),
conf.getZookeeperOpsRetryWaitMsecs(),
this,
@@ -345,7 +351,6 @@ public abstract class BspService<I extends WritableComparable,
}
}
-
/**
* Get the superstep from a ZooKeeper path
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 8cf403a..abc81e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -1197,4 +1197,13 @@ public class GiraphConfiguration extends Configuration
public void setWaitTaskDoneTimeoutMs(int ms) {
WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms);
}
+
+ /**
+ * Check whether to track job progress on client or not
+ *
+ * @return True if job progress should be tracked on client
+ */
+ public boolean trackJobProgressOnClient() {
+ return TRACK_JOB_PROGRESS_ON_CLIENT.get(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 4e68308..9271152 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -996,6 +996,12 @@ public interface GiraphConstants {
String ZOOKEEPER_HALT_NODE_COUNTER_GROUP = "Zookeeper halt node";
/**
+ * This counter group will contain one counter whose name is the ZooKeeper
+ * node path which contains all data about this job
+ */
+ String ZOOKEEPER_BASE_PATH_COUNTER_GROUP = "Zookeeper base path";
+
+ /**
* Which class to use to write instructions on how to halt the application
*/
ClassConfOption<HaltApplicationUtils.HaltInstructionsWriter>
@@ -1013,5 +1019,10 @@ public interface GiraphConstants {
new IntConfOption("giraph.waitTaskDoneTimeoutMs", MINUTES.toMillis(15),
"Maximum timeout (in ms) for waiting for all all tasks to " +
"complete");
+
+ /** Whether to track job progress on client or not */
+ BooleanConfOption TRACK_JOB_PROGRESS_ON_CLIENT =
+ new BooleanConfOption("giraph.trackJobProgressOnClient", true,
+ "Whether to track job progress on client or not");
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 1fe1d10..0303530 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -35,6 +35,7 @@ import org.apache.giraph.time.Times;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.TimedLogger;
import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerProgress;
import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -74,6 +75,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
private static final Logger LOG = Logger.getLogger(ComputeCallable.class);
/** Class time object */
private static final Time TIME = SystemTime.get();
+ /** How often to update WorkerProgress */
+ private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
/** Context */
private final Mapper<?, ?, ?, ?>.Context context;
/** Graph state */
@@ -229,6 +232,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
Partition<I, V, E> partition) throws IOException, InterruptedException {
PartitionStats partitionStats =
new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
+ long verticesComputedProgress = 0;
// Make sure this is thread-safe across runs
synchronized (partition) {
for (Vertex<I, V, E> vertex : partition) {
@@ -260,10 +264,18 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
// Add statistics for this vertex
partitionStats.incrVertexCount();
partitionStats.addEdgeCount(vertex.getNumEdges());
+
+ verticesComputedProgress++;
+ if (verticesComputedProgress == VERTICES_TO_UPDATE_PROGRESS) {
+ WorkerProgress.get().addVerticesComputed(verticesComputedProgress);
+ verticesComputedProgress = 0;
+ }
}
messageStore.clearPartition(partition.getId());
}
+ WorkerProgress.get().addVerticesComputed(verticesComputedProgress);
+ WorkerProgress.get().incrementPartitionsComputed();
return partitionStats;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 0617973..a84ac66 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -45,6 +45,7 @@ import org.apache.giraph.worker.BspServiceWorker;
import org.apache.giraph.worker.InputSplitsCallable;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
+import org.apache.giraph.worker.WorkerProgress;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -215,9 +216,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
Thread.sleep(GiraphConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT *
GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
}
- int sessionMsecTimeout = conf.getZooKeeperSessionTimeout();
try {
- instantiateBspService(sessionMsecTimeout);
+ instantiateBspService();
} catch (IOException e) {
LOG.error("setup: Caught exception just before end of setup", e);
if (zkManager != null) {
@@ -537,17 +537,15 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/**
* Instantiate the appropriate BspService object (Master or Worker)
* for this compute node.
- * @param sessionMsecTimeout configurable session timeout
*/
- private void instantiateBspService(int sessionMsecTimeout)
+ private void instantiateBspService()
throws IOException, InterruptedException {
if (graphFunctions.isMaster()) {
if (LOG.isInfoEnabled()) {
LOG.info("setup: Starting up BspServiceMaster " +
"(master thread)...");
}
- serviceMaster = new BspServiceMaster<I, V, E>(
- sessionMsecTimeout, context, this);
+ serviceMaster = new BspServiceMaster<I, V, E>(context, this);
masterThread = new MasterThread<I, V, E>(serviceMaster, context);
masterThread.start();
}
@@ -555,8 +553,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
if (LOG.isInfoEnabled()) {
LOG.info("setup: Starting up BspServiceWorker...");
}
- serviceWorker = new BspServiceWorker<I, V, E>(
- sessionMsecTimeout, context, this);
+ serviceWorker = new BspServiceWorker<I, V, E>(context, this);
if (LOG.isInfoEnabled()) {
LOG.info("setup: Registering health of this worker...");
}
@@ -711,10 +708,18 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
int numThreads) {
final BlockingQueue<Integer> computePartitionIdQueue =
new ArrayBlockingQueue<Integer>(numPartitions);
+ long verticesToCompute = 0;
for (Integer partitionId :
serviceWorker.getPartitionStore().getPartitionIds()) {
computePartitionIdQueue.add(partitionId);
- }
+ verticesToCompute +=
+ serviceWorker.getPartitionStore().getOrCreatePartition(
+ partitionId).getVertexCount();
+ }
+ WorkerProgress.get().startSuperstep(
+ serviceWorker.getSuperstep(),
+ verticesToCompute,
+ serviceWorker.getPartitionStore().getNumPartitions());
GiraphTimerContext computeAllTimerContext = computeAll.time();
timeToFirstMessageTimerContext = timeToFirstMessage.time();
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
new file mode 100644
index 0000000..0810040
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.worker.WorkerProgress;
+
+/**
+ * Class which combines multiple workers' progresses to get overall
+ * application progress
+ */
+public class CombinedWorkerProgress extends WorkerProgress {
+ /**
+ * How many workers have reported that they are in highest reported
+ * superstep
+ */
+ private int workersInSuperstep = 0;
+ /**
+ * How many workers reported that they finished application
+ */
+ private int workersDone = 0;
+
+ /**
+ * Constructor
+ *
+ * @param workerProgresses Worker progresses to combine
+ */
+ public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses) {
+ for (WorkerProgress workerProgress : workerProgresses) {
+ if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
+ verticesToCompute = 0;
+ verticesComputed = 0;
+ partitionsToCompute = 0;
+ partitionsComputed = 0;
+ currentSuperstep = workerProgress.getCurrentSuperstep();
+ workersInSuperstep = 0;
+ }
+
+ if (workerProgress.getCurrentSuperstep() == currentSuperstep) {
+ workersInSuperstep++;
+ if (isInputSuperstep()) {
+ verticesLoaded += workerProgress.getVerticesLoaded();
+ vertexInputSplitsLoaded +=
+ workerProgress.getVertexInputSplitsLoaded();
+ edgesLoaded += workerProgress.getEdgesLoaded();
+ edgeInputSplitsLoaded += workerProgress.getEdgeInputSplitsLoaded();
+ } else if (isComputeSuperstep()) {
+ verticesToCompute += workerProgress.getVerticesToCompute();
+ verticesComputed += workerProgress.getVerticesComputed();
+ partitionsToCompute += workerProgress.getPartitionsToCompute();
+ partitionsComputed += workerProgress.getPartitionsComputed();
+ } else if (isOutputSuperstep()) {
+ verticesToStore += workerProgress.getVerticesToStore();
+ verticesStored += workerProgress.getVerticesStored();
+ partitionsToStore += workerProgress.getPartitionsToStore();
+ partitionsStored += workerProgress.getPartitionsStored();
+ }
+ }
+
+ if (workerProgress.isStoringDone()) {
+ workersDone++;
+ }
+ }
+ }
+
+ /**
+ * Is the application done
+ *
+ * @param expectedWorkersDone Number of workers which should be done in
+ * order for application to be done
+ * @return True if application is done
+ */
+ public boolean isDone(int expectedWorkersDone) {
+ return workersDone == expectedWorkersDone;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
+ if (isInputSuperstep()) {
+ sb.append("Loading data: ");
+ sb.append(verticesLoaded).append(" vertices loaded, ");
+ sb.append(vertexInputSplitsLoaded).append(
+ " vertex input splits loaded; ");
+ sb.append(edgesLoaded).append(" edges loaded, ");
+ sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
+ } else if (isComputeSuperstep()) {
+ sb.append("Compute superstep ").append(currentSuperstep).append(": ");
+ sb.append(verticesComputed).append(" out of ").append(
+ verticesToCompute).append(" vertices computed; ");
+ sb.append(partitionsComputed).append(" out of ").append(
+ partitionsToCompute).append(" partitions computed");
+ } else if (isOutputSuperstep()) {
+ sb.append("Storing data: ");
+ sb.append(verticesStored).append(" out of ").append(
+ verticesToStore).append(" vertices stored; ");
+ sb.append(partitionsStored).append(" out of ").append(
+ partitionsToStore).append(" partitions stored");
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 40670bb..4a1f02e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -252,9 +252,14 @@ public class GiraphJob {
LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL());
}
HaltApplicationUtils.printHaltInfo(submittedJob, conf);
+ JobProgressTracker jobProgressTracker = conf.trackJobProgressOnClient() ?
+ new JobProgressTracker(submittedJob, conf) : null;
jobObserver.jobRunning(submittedJob);
boolean passed = submittedJob.waitForCompletion(verbose);
+ if (jobProgressTracker != null) {
+ jobProgressTracker.stop();
+ }
jobObserver.jobFinished(submittedJob, passed);
if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
return passed;
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
index 28b5781..8150de6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
@@ -20,69 +20,33 @@ package org.apache.giraph.job;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.utils.CounterUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
-import java.io.IOException;
-
/**
* Utility methods for halting application while running
*/
public class HaltApplicationUtils {
- /** Milliseconds to sleep for while waiting for halt info */
- private static final int SLEEP_MSECS = 100;
-
/** Do not instantiate */
private HaltApplicationUtils() { }
/**
- * Wait for halt info (zk server and node) to become available
- *
- * @param submittedJob Submitted job
- * @return True if halt info became available, false if job completed
- * before it became available
- */
- private static boolean waitForHaltInfo(Job submittedJob) throws IOException {
- try {
- while (submittedJob.getCounters().getGroup(
- GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).size() == 0) {
- if (submittedJob.isComplete()) {
- return false;
- }
- Thread.sleep(SLEEP_MSECS);
- }
- while (submittedJob.getCounters().getGroup(
- GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).size() == 0) {
- if (submittedJob.isComplete()) {
- return false;
- }
- Thread.sleep(SLEEP_MSECS);
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "waitForHaltInfo: InterruptedException occurred", e);
- }
- return true;
- }
-
- /**
* Wait for halt info to become available and print instructions on how to
* halt
*
* @param submittedJob Submitted job
- * @param conf Configuration
+ * @param conf Configuration
*/
public static void printHaltInfo(Job submittedJob,
- GiraphConfiguration conf) throws IOException {
- if (waitForHaltInfo(submittedJob)) {
- String zkServer = submittedJob.getCounters().getGroup(
- GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).iterator()
- .next().getName();
- String haltNode = submittedJob.getCounters().getGroup(
- GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).iterator()
- .next().getName();
- GiraphConstants.HALT_INSTRUCTIONS_WRITER_CLASS.newInstance(conf)
- .writeHaltInstructions(zkServer, haltNode);
+ GiraphConfiguration conf) {
+ String zkServer = CounterUtils.waitAndGetCounterNameFromGroup(
+ submittedJob, GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP);
+ String haltNode = CounterUtils.waitAndGetCounterNameFromGroup(
+ submittedJob, GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP);
+ if (zkServer != null && haltNode != null) {
+ GiraphConstants.HALT_INSTRUCTIONS_WRITER_CLASS.newInstance(
+ conf).writeHaltInstructions(zkServer, haltNode);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
new file mode 100644
index 0000000..f685344
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.utils.CounterUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerProgress;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class which tracks job's progress on client
+ */
+public class JobProgressTracker implements Watcher {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(JobProgressTracker.class);
+ /** How often to print job's progress */
+ private static final int UPDATE_MILLISECONDS = 5 * 1000;
+ /** Thread which periodically writes job's progress */
+ private Thread writerThread;
+ /** ZooKeeperExt */
+ private ZooKeeperExt zk;
+ /** Whether application is finished */
+ private volatile boolean finished = false;
+
+ /**
+ * Constructor
+ *
+ * @param submittedJob Job to track
+ * @param conf Configuration
+ */
+ public JobProgressTracker(final Job submittedJob,
+ final GiraphConfiguration conf) throws IOException, InterruptedException {
+ String zkServer = CounterUtils.waitAndGetCounterNameFromGroup(
+ submittedJob, GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP);
+ final String basePath = CounterUtils.waitAndGetCounterNameFromGroup(
+ submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP);
+ // Connect to ZooKeeper
+ zk = new ZooKeeperExt(
+ zkServer,
+ conf.getZooKeeperSessionTimeout(),
+ conf.getZookeeperOpsMaxAttempts(),
+ conf.getZookeeperOpsRetryWaitMsecs(),
+ this,
+ new Progressable() {
+ @Override
+ public void progress() {
+ }
+ });
+ writerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ String workerProgressBasePath = basePath + BspService.WORKER_PROGRESSES;
+ try {
+ while (!finished) {
+ if (zk.exists(workerProgressBasePath, false) != null) {
+ // Get locations of all worker progresses
+ List<String> workerProgressPaths = zk.getChildrenExt(
+ workerProgressBasePath, false, false, true);
+ List<WorkerProgress> workerProgresses =
+ new ArrayList<WorkerProgress>(workerProgressPaths.size());
+ // Read all worker progresses
+ for (String workerProgressPath : workerProgressPaths) {
+ WorkerProgress workerProgress = new WorkerProgress();
+ byte[] zkData = zk.getData(workerProgressPath, false, null);
+ WritableUtils.readFieldsFromByteArray(zkData, workerProgress);
+ workerProgresses.add(workerProgress);
+ }
+ // Combine and log
+ CombinedWorkerProgress combinedWorkerProgress =
+ new CombinedWorkerProgress(workerProgresses);
+ if (LOG.isInfoEnabled()) {
+ LOG.info(combinedWorkerProgress.toString());
+ }
+ // Check if application is done
+ if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
+ break;
+ }
+ }
+ Thread.sleep(UPDATE_MILLISECONDS);
+ }
+ } catch (InterruptedException | KeeperException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("run: Exception occurred", e);
+ }
+ } finally {
+ try {
+ // Create a node so master knows we stopped communicating with
+ // ZooKeeper and it's safe to cleanup
+ zk.createExt(
+ basePath + BspService.CLEANED_UP_DIR + "/client",
+ null,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ zk.close();
+ } catch (InterruptedException | KeeperException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("run: Exception occurred", e);
+ }
+ }
+ }
+ }
+ });
+ writerThread.start();
+ }
+
+ /**
+ * Stop the thread which logs application progress
+ */
+ public void stop() {
+ finished = true;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 78487ef..cfee4c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -187,15 +187,13 @@ public class BspServiceMaster<I extends WritableComparable,
/**
* Constructor for setting up the master.
*
- * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
* @param context Mapper context
* @param graphTaskManager GraphTaskManager for this compute node
*/
public BspServiceMaster(
- int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
GraphTaskManager<I, V, E> graphTaskManager) {
- super(sessionMsecTimeout, context, graphTaskManager);
+ super(context, graphTaskManager);
workerWroteCheckpoint = new PredicateLock(context);
registerBspEvent(workerWroteCheckpoint);
superstepStateChanged = new PredicateLock(context);
@@ -1725,6 +1723,10 @@ public class BspServiceMaster<I extends WritableComparable,
GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
maxTasks *= 2;
}
+ if (getConfiguration().trackJobProgressOnClient()) {
+ // For job client
+ maxTasks++;
+ }
List<String> cleanedUpChildrenList = null;
while (true) {
try {
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java
new file mode 100644
index 0000000..afec660
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/** Utility methods for dealing with counters */
+public class CounterUtils {
+ /** Milliseconds to sleep for while waiting for counter to appear */
+ private static final int SLEEP_MSECS = 100;
+
+ /** Do not instantiate */
+ private CounterUtils() {
+ }
+
+ /**
+ * Wait for a counter to appear in a group and then return the name of that
+ * counter. If job finishes before counter appears, return null.
+ *
+ * @param job Job
+ * @param group Name of the counter group
+ * @return Name of the counter inside of the group, or null if job finishes
+ * before counter appears
+ */
+ public static String waitAndGetCounterNameFromGroup(Job job, String group) {
+ try {
+ while (job.getCounters().getGroup(group).size() == 0) {
+ if (job.isComplete()) {
+ return null;
+ }
+ Thread.sleep(SLEEP_MSECS);
+ }
+ return job.getCounters().getGroup(group).iterator().next().getName();
+ } catch (IOException | InterruptedException e) {
+ throw new IllegalStateException(
+ "waitAndGetCounterNameFromGroup: Exception occurred", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index bc29b03..13de188 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -159,6 +159,8 @@ public class BspServiceWorker<I extends WritableComparable,
/** array of observers to call back to */
private final WorkerObserver[] observers;
+ /** Writer for worker progress */
+ private final WorkerProgressWriter workerProgressWriter;
// Per-Superstep Metrics
/** Timer for WorkerContext#postSuperstep */
@@ -169,18 +171,16 @@ public class BspServiceWorker<I extends WritableComparable,
/**
* Constructor for setting up the worker.
*
- * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
* @param context Mapper context
* @param graphTaskManager GraphTaskManager for this compute node
* @throws IOException
* @throws InterruptedException
*/
public BspServiceWorker(
- int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
GraphTaskManager<I, V, E> graphTaskManager)
throws IOException, InterruptedException {
- super(sessionMsecTimeout, context, graphTaskManager);
+ super(context, graphTaskManager);
ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
partitionExchangeChildrenChanged = new PredicateLock(context);
registerBspEvent(partitionExchangeChildrenChanged);
@@ -207,6 +207,9 @@ public class BspServiceWorker<I extends WritableComparable,
}
observers = conf.createWorkerObservers();
+ workerProgressWriter = conf.trackJobProgressOnClient() ?
+ new WorkerProgressWriter(myProgressPath, getZkExt()) : null;
+
GiraphMetrics.get().addSuperstepResetObserver(this);
}
@@ -515,6 +518,7 @@ public class BspServiceWorker<I extends WritableComparable,
} else {
vertexEdgeCount = new VertexEdgeCount();
}
+ WorkerProgress.get().finishLoadingVertices();
if (getConfiguration().hasEdgeInputFormat()) {
// Ensure the edge InputSplits are ready for processing
@@ -531,6 +535,7 @@ public class BspServiceWorker<I extends WritableComparable,
}
getContext().progress();
}
+ WorkerProgress.get().finishLoadingEdges();
if (LOG.isInfoEnabled()) {
LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
@@ -951,10 +956,21 @@ public class BspServiceWorker<I extends WritableComparable,
new ArrayBlockingQueue<Integer>(numPartitions);
Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+ long verticesToStore = 0;
+ for (int partitionId : getPartitionStore().getPartitionIds()) {
+ verticesToStore += getPartitionStore().getOrCreatePartition(
+ partitionId).getVertexCount();
+ }
+ WorkerProgress.get().startStoring(
+ verticesToStore, getPartitionStore().getNumPartitions());
+
CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
@Override
public Callable<Void> newCallable(int callableId) {
return new Callable<Void>() {
+ /** How often to update WorkerProgress */
+ private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
+
@Override
public Void call() throws Exception {
VertexWriter<I, V, E> vertexWriter =
@@ -962,6 +978,7 @@ public class BspServiceWorker<I extends WritableComparable,
vertexWriter.setConf(getConfiguration());
vertexWriter.initialize(getContext());
long nextPrintVertices = 0;
+ long nextUpdateProgressVertices = 0;
long nextPrintMsecs = System.currentTimeMillis() + 15000;
int partitionIndex = 0;
int numPartitions = getPartitionStore().getNumPartitions();
@@ -989,9 +1006,18 @@ public class BspServiceWorker<I extends WritableComparable,
nextPrintMsecs = System.currentTimeMillis() + 15000;
nextPrintVertices = verticesWritten + 250000;
}
+
+ if (verticesWritten >= nextUpdateProgressVertices) {
+ WorkerProgress.get().addVerticesStored(
+ VERTICES_TO_UPDATE_PROGRESS);
+ nextUpdateProgressVertices += VERTICES_TO_UPDATE_PROGRESS;
+ }
}
getPartitionStore().putPartition(partition);
++partitionIndex;
+ WorkerProgress.get().addVerticesStored(
+ verticesWritten % VERTICES_TO_UPDATE_PROGRESS);
+ WorkerProgress.get().incrementPartitionsStored();
}
vertexWriter.close(getContext()); // the temp results are saved now
return null;
@@ -1147,6 +1173,11 @@ public class BspServiceWorker<I extends WritableComparable,
setCachedSuperstep(getSuperstep() - 1);
saveVertices(finishedSuperstepStats.getLocalVertexCount());
saveEdges();
+ WorkerProgress.get().finishStoring();
+ if (workerProgressWriter != null) {
+ WorkerProgress.writeToZnode(getZkExt(), myProgressPath);
+ workerProgressWriter.stop();
+ }
getPartitionStore().shutdown();
// All worker processes should denote they are done by adding special
// znode. Once the number of znodes equals the number of partitions
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 8ec0453..828eac4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -174,6 +174,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
// Update status every EDGES_UPDATE_PERIOD edges
if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) {
totalEdgesMeter.mark(EDGES_UPDATE_PERIOD);
+ WorkerProgress.get().addEdgesLoaded(EDGES_UPDATE_PERIOD);
LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
"readEdgeInputSplit: Loaded " +
totalEdgesMeter.count() + " edges at " +
@@ -198,6 +199,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
totalEdgesFiltered.inc(inputSplitEdgesFiltered);
totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
+ WorkerProgress.get().addEdgesLoaded(
+ inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
+ WorkerProgress.get().incrementEdgeInputSplitsLoaded();
+
return new VertexEdgeCount(0, inputSplitEdgesLoaded);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 01a6fc5..e3e04d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -176,6 +176,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
// Update status every VERTICES_UPDATE_PERIOD vertices
if (inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD == 0) {
totalVerticesMeter.mark(VERTICES_UPDATE_PERIOD);
+ WorkerProgress.get().addVerticesLoaded(VERTICES_UPDATE_PERIOD);
totalEdgesMeter.mark(edgesSinceLastUpdate);
inputSplitEdgesLoaded += edgesSinceLastUpdate;
edgesSinceLastUpdate = 0;
@@ -208,6 +209,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
vertexReader.close();
+
+ WorkerProgress.get().addVerticesLoaded(
+ inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
+ WorkerProgress.get().incrementVertexInputSplitsLoaded();
+
return new VertexEdgeCount(inputSplitVerticesLoaded,
inputSplitEdgesLoaded + edgesSinceLastUpdate);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
new file mode 100644
index 0000000..f7de88b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * Stores information about a worker's progress that is periodically written to
+ * ZooKeeper with {@link WorkerProgressWriter}.
+ */
+@ThreadSafe
+public class WorkerProgress implements Writable {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(WorkerProgress.class);
+ /** Singleton instance for everyone to use */
+ private static final WorkerProgress INSTANCE = new WorkerProgress();
+
+ /** Superstep which worker is executing, Long.MAX_VALUE if it's output */
+ protected long currentSuperstep = -1;
+
+ /** How many vertices were loaded until now */
+ protected long verticesLoaded = 0;
+ /** How many vertex input splits were loaded until now */
+ protected int vertexInputSplitsLoaded = 0;
+ /** Whether worker finished loading vertices */
+ protected boolean loadingVerticesDone = false;
+ /** How many edges were loaded */
+ protected long edgesLoaded = 0;
+ /** How many edge input splits were loaded until now */
+ protected int edgeInputSplitsLoaded = 0;
+ /** Whether worker finished loading edges until now */
+ protected boolean loadingEdgesDone = false;
+
+ /** How many vertices are there to compute in current superstep */
+ protected long verticesToCompute = 0;
+ /** How many vertices were computed in current superstep until now */
+ protected long verticesComputed = 0;
+ /** How many partitions are there to compute in current superstep */
+ protected int partitionsToCompute = 0;
+ /** How many partitions were computed in current superstep until now */
+ protected int partitionsComputed = 0;
+
+ /** Whether all compute supersteps are done */
+ protected boolean computationDone = false;
+
+ /** How many vertices are there to store */
+ protected long verticesToStore = 0;
+ /** How many vertices were stored until now */
+ protected long verticesStored = 0;
+ /** How many partitions are there to store */
+ protected int partitionsToStore = 0;
+ /** How many partitions were stored until now */
+ protected int partitionsStored = 0;
+ /** Whether worker finished storing data */
+ protected boolean storingDone = false;
+
+ /**
+ * Get singleton instance of WorkerProgress.
+ *
+ * @return WorkerProgress singleton instance
+ */
+ public static WorkerProgress get() {
+ return INSTANCE;
+ }
+
+ /**
+ * Write worker's progress to znode
+ *
+ * @param zk ZooKeeperExt
+ * @param myProgressPath Path to write the progress to
+ */
+ public static void writeToZnode(ZooKeeperExt zk, String myProgressPath) {
+ byte[] byteArray = WritableUtils.writeToByteArray(get());
+ try {
+ zk.createOrSetExt(myProgressPath,
+ byteArray,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true,
+ -1);
+ } catch (KeeperException | InterruptedException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("writeToZnode: " + e.getClass().getName() +
+ " exception occurred", e);
+ }
+ }
+ }
+
+ public synchronized boolean isLoadingVerticesDone() {
+ return loadingVerticesDone;
+ }
+
+ public synchronized boolean isLoadingEdgesDone() {
+ return loadingEdgesDone;
+ }
+
+ public synchronized boolean isComputationDone() {
+ return computationDone;
+ }
+
+ public synchronized boolean isStoringDone() {
+ return storingDone;
+ }
+
+ /**
+ * Add number of vertices loaded
+ *
+ * @param verticesLoaded How many vertices were loaded since the last
+ * time this function was called
+ */
+ public synchronized void addVerticesLoaded(long verticesLoaded) {
+ this.verticesLoaded += verticesLoaded;
+ }
+
+ /**
+ * Increment number of vertex input splits which were loaded
+ */
+ public synchronized void incrementVertexInputSplitsLoaded() {
+ vertexInputSplitsLoaded++;
+ }
+
+ /**
+ * Notify this class that worker finished loading vertices
+ */
+ public synchronized void finishLoadingVertices() {
+ loadingVerticesDone = true;
+ }
+
+ /**
+ * Add number of edges loaded
+ *
+ * @param edgesLoaded How many edges were loaded since the last
+ * time this function was called
+ */
+ public synchronized void addEdgesLoaded(long edgesLoaded) {
+ this.edgesLoaded += edgesLoaded;
+ }
+
+ /**
+ * Increment number of edge input splits which were loaded
+ */
+ public synchronized void incrementEdgeInputSplitsLoaded() {
+ edgeInputSplitsLoaded++;
+ }
+
+ /**
+ * Notify this class that worker finished loading edges
+ */
+ public synchronized void finishLoadingEdges() {
+ loadingEdgesDone = true;
+ }
+
+ /**
+ * Notify this class that next computation superstep is starting
+ *
+ * @param superstep Superstep which is starting
+ * @param verticesToCompute How many vertices are there to compute
+ * @param partitionsToCompute How many partitions are there to compute
+ */
+ public synchronized void startSuperstep(long superstep,
+ long verticesToCompute, int partitionsToCompute) {
+ this.currentSuperstep = superstep;
+ this.verticesToCompute = verticesToCompute;
+ this.partitionsToCompute = partitionsToCompute;
+ verticesComputed = 0;
+ partitionsComputed = 0;
+ }
+
+ /**
+ * Add number of vertices computed
+ *
+ * @param verticesComputed How many vertices were computed since the last
+ * time this function was called
+ */
+ public synchronized void addVerticesComputed(long verticesComputed) {
+ this.verticesComputed += verticesComputed;
+ }
+
+ /**
+ * Increment number of partitions which were computed
+ */
+ public synchronized void incrementPartitionsComputed() {
+ partitionsComputed++;
+ }
+
+ /**
+ * Notify this class that worker is starting to store data
+ *
+ * @param verticesToStore How many vertices should be stored
+ * @param partitionsToStore How many partitions should be stored
+ */
+ public synchronized void startStoring(long verticesToStore,
+ int partitionsToStore) {
+ computationDone = true;
+ verticesToCompute = 0;
+ verticesComputed = 0;
+ partitionsToCompute = 0;
+ partitionsComputed = 0;
+ currentSuperstep = Long.MAX_VALUE;
+ this.verticesToStore = verticesToStore;
+ this.partitionsToStore = partitionsToStore;
+ }
+
+ /**
+ * Add number of vertices stored
+ *
+ * @param verticesStored How many vertices were stored since the last time
+ * this function was called
+ */
+ public synchronized void addVerticesStored(long verticesStored) {
+ this.verticesStored += verticesStored;
+ }
+
+ /**
+ * Increment number of partitions which were stored
+ */
+ public synchronized void incrementPartitionsStored() {
+ partitionsStored++;
+ }
+
+ /**
+ * Notify this class that storing data is done
+ */
+ public synchronized void finishStoring() {
+ storingDone = true;
+ }
+
+ public synchronized long getCurrentSuperstep() {
+ return currentSuperstep;
+ }
+
+ public synchronized long getVerticesLoaded() {
+ return verticesLoaded;
+ }
+
+ public synchronized int getVertexInputSplitsLoaded() {
+ return vertexInputSplitsLoaded;
+ }
+
+ public synchronized long getEdgesLoaded() {
+ return edgesLoaded;
+ }
+
+ public synchronized int getEdgeInputSplitsLoaded() {
+ return edgeInputSplitsLoaded;
+ }
+
+ public synchronized long getVerticesToCompute() {
+ return verticesToCompute;
+ }
+
+ public synchronized long getVerticesComputed() {
+ return verticesComputed;
+ }
+
+ public synchronized int getPartitionsToCompute() {
+ return partitionsToCompute;
+ }
+
+ public synchronized int getPartitionsComputed() {
+ return partitionsComputed;
+ }
+
+ public synchronized long getVerticesToStore() {
+ return verticesToStore;
+ }
+
+ public synchronized long getVerticesStored() {
+ return verticesStored;
+ }
+
+ public synchronized int getPartitionsToStore() {
+ return partitionsToStore;
+ }
+
+ public synchronized int getPartitionsStored() {
+ return partitionsStored;
+ }
+
+ public synchronized boolean isInputSuperstep() {
+ return currentSuperstep == -1;
+ }
+
+ public synchronized boolean isComputeSuperstep() {
+ return currentSuperstep >= 0 && currentSuperstep < Long.MAX_VALUE;
+ }
+
+ public synchronized boolean isOutputSuperstep() {
+ return currentSuperstep == Long.MAX_VALUE;
+ }
+
+ @Override
+ public synchronized void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeLong(currentSuperstep);
+
+ dataOutput.writeLong(verticesLoaded);
+ dataOutput.writeInt(vertexInputSplitsLoaded);
+ dataOutput.writeBoolean(loadingVerticesDone);
+ dataOutput.writeLong(edgesLoaded);
+ dataOutput.writeInt(edgeInputSplitsLoaded);
+ dataOutput.writeBoolean(loadingEdgesDone);
+
+ dataOutput.writeLong(verticesToCompute);
+ dataOutput.writeLong(verticesComputed);
+ dataOutput.writeInt(partitionsToCompute);
+ dataOutput.writeInt(partitionsComputed);
+
+ dataOutput.writeBoolean(computationDone);
+
+ dataOutput.writeLong(verticesToStore);
+ dataOutput.writeLong(verticesStored);
+ dataOutput.writeInt(partitionsToStore);
+ dataOutput.writeInt(partitionsStored);
+ dataOutput.writeBoolean(storingDone);
+ }
+
+ @Override
+ public synchronized void readFields(DataInput dataInput) throws IOException {
+ currentSuperstep = dataInput.readLong();
+
+ verticesLoaded = dataInput.readLong();
+ vertexInputSplitsLoaded = dataInput.readInt();
+ loadingVerticesDone = dataInput.readBoolean();
+ edgesLoaded = dataInput.readLong();
+ edgeInputSplitsLoaded = dataInput.readInt();
+ loadingEdgesDone = dataInput.readBoolean();
+
+ verticesToCompute = dataInput.readLong();
+ verticesComputed = dataInput.readLong();
+ partitionsToCompute = dataInput.readInt();
+ partitionsComputed = dataInput.readInt();
+
+ computationDone = dataInput.readBoolean();
+
+ verticesToStore = dataInput.readLong();
+ verticesStored = dataInput.readLong();
+ partitionsToStore = dataInput.readInt();
+ partitionsStored = dataInput.readInt();
+ storingDone = dataInput.readBoolean();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
new file mode 100644
index 0000000..f8c7571
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.log4j.Logger;
+
+/**
+ * Class which periodically writes worker's progress to zookeeper
+ */
+public class WorkerProgressWriter {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(WorkerProgressWriter.class);
+ /** How often to update worker's progress */
+ private static final int WRITE_UPDATE_PERIOD_MILLISECONDS = 10 * 1000;
+
+ /** Thread which writes worker's progress */
+ private final Thread writerThread;
+ /** Whether worker finished application */
+ private volatile boolean finished = false;
+
+ /**
+ * Constructor, starts separate thread to periodically update worker's
+ * progress
+ *
+ * @param myProgressPath Path where this worker's progress should be stored
+ * @param zk ZooKeeperExt
+ */
+ public WorkerProgressWriter(final String myProgressPath,
+ final ZooKeeperExt zk) {
+ writerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (!finished) {
+ WorkerProgress.writeToZnode(zk, myProgressPath);
+ double factor = 1 + Math.random();
+ Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor));
+ }
+ } catch (InterruptedException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("run: WorkerProgressWriter interrupted", e);
+ }
+ }
+ }
+ });
+ writerThread.start();
+ }
+
+ /**
+ * Stop the thread which writes worker's progress
+ */
+ public void stop() {
+ finished = true;
+ writerThread.interrupt();
+ }
+}