You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:13 UTC
[30/47] git commit: updated refs/heads/release-1.1 to 4c139ee
GIRAPH-944: Improve job tracking on command line
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4485e563
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4485e563
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4485e563
Branch: refs/heads/release-1.1
Commit: 4485e563a6582afb1c848ea80888fdad50ada516
Parents: de0efb0
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Aug 26 11:35:14 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Aug 26 14:42:52 2014 -0700
----------------------------------------------------------------------
giraph-core/pom.xml | 24 +-
.../java/org/apache/giraph/bsp/BspService.java | 12 +-
.../apache/giraph/bsp/CentralizedService.java | 9 +
.../org/apache/giraph/conf/GiraphConstants.java | 2 +-
.../apache/giraph/graph/GraphTaskManager.java | 30 +++
.../giraph/graph/JobProgressTrackerClient.java | 33 +++
.../graph/JobProgressTrackerClientNoOp.java | 47 ++++
.../RetryableJobProgressTrackerClient.java | 175 ++++++++++++++
.../java/org/apache/giraph/job/GiraphJob.java | 16 +-
.../apache/giraph/job/JobProgressTracker.java | 155 +++----------
.../giraph/job/JobProgressTrackerService.java | 193 +++++++++++++++
.../apache/giraph/master/BspServiceMaster.java | 10 +-
.../org/apache/giraph/master/MasterCompute.java | 30 ++-
.../apache/giraph/scripting/ScriptLoader.java | 6 +-
.../java/org/apache/giraph/utils/FileUtils.java | 2 +-
.../apache/giraph/worker/BspServiceWorker.java | 3 +-
.../org/apache/giraph/worker/WorkerContext.java | 10 +
.../apache/giraph/worker/WorkerProgress.java | 232 ++++++++++---------
.../giraph/worker/WorkerProgressWriter.java | 30 +--
.../org/apache/giraph/zk/ZooKeeperManager.java | 4 +-
.../test/java/org/apache/giraph/BspCase.java | 4 +-
giraph-examples/pom.xml | 8 -
.../java/org/apache/giraph/TestBspBasic.java | 4 +-
.../giraph/hive/jython/HiveJythonUtils.java | 2 +-
pom.xml | 81 ++++++-
25 files changed, 825 insertions(+), 297 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index b66ba1d..23f6666 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -289,14 +289,6 @@ under the License.
</plugin>
</plugins>
</build>
- <dependencies>
- <dependency>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${dep.oldnetty.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
</profile>
<profile>
@@ -467,6 +459,22 @@ under the License.
<dependencies>
<!-- compile dependencies. sorted lexicographically. -->
<dependency>
+ <groupId>com.facebook.nifty</groupId>
+ <artifactId>nifty-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.swift</groupId>
+ <artifactId>swift-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.swift</groupId>
+ <artifactId>swift-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.swift</groupId>
+ <artifactId>swift-service</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.facebook.thirdparty.yourkit-api</groupId>
<artifactId>yjp-controller-api-redist</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index c418a89..2a50489 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -23,6 +23,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.InputSplitEvents;
import org.apache.giraph.graph.InputSplitPaths;
+import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.giraph.worker.WorkerInfo;
@@ -161,8 +162,6 @@ public abstract class BspService<I extends WritableComparable,
"/_partitionExchangeDir";
/** Denotes that the superstep is done */
public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
- /** Stores progress info for workers */
- public static final String WORKER_PROGRESSES = "/_workerProgresses";
/** Denotes that computation should be halted */
public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
/** User sets this flag to checkpoint and stop the job */
@@ -241,8 +240,6 @@ public abstract class BspService<I extends WritableComparable,
protected final String savedCheckpointBasePath;
/** Path to the master election path */
protected final String masterElectionPath;
- /** Stores progress info of this worker */
- protected final String myProgressPath;
/** If this path exists computation will be halted */
protected final String haltComputationPath;
/** Private ZooKeeper instance that implements the service */
@@ -363,7 +360,6 @@ public abstract class BspService<I extends WritableComparable,
getCheckpointBasePath(getConfiguration(), getJobId());
masterElectionPath = basePath + MASTER_ELECTION_DIR;
- myProgressPath = basePath + WORKER_PROGRESSES + "/" + taskPartition;
String serverPortList = conf.getZookeeperList();
haltComputationPath = basePath + HALT_COMPUTATION_NODE;
getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
@@ -404,7 +400,6 @@ public abstract class BspService<I extends WritableComparable,
"BspService: Invalid superstep to restart - " +
restartedSuperstep);
}
-
}
/**
@@ -1258,6 +1253,11 @@ public abstract class BspService<I extends WritableComparable,
return lastCheckpointedSuperstep;
}
+ @Override
+ public JobProgressTracker getJobProgressTracker() {
+ return getGraphTaskManager().getJobProgressTracker();
+ }
+
/**
* Only get the finalized checkpoint files
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
index 560f1fb..0cadfb7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -19,6 +19,8 @@
package org.apache.giraph.bsp;
import java.util.List;
+
+import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -56,4 +58,11 @@ public interface CentralizedService<I extends WritableComparable,
* @return List of workers
*/
List<WorkerInfo> getWorkerInfoList();
+
+ /**
+ * Get JobProgressTracker to report progress to
+ *
+ * @return JobProgressTrackerClient
+ */
+ JobProgressTracker getJobProgressTracker();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index da0a8db..d1fdf57 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -1119,7 +1119,7 @@ public interface GiraphConstants {
/** Whether to track job progress on client or not */
BooleanConfOption TRACK_JOB_PROGRESS_ON_CLIENT =
- new BooleanConfOption("giraph.trackJobProgressOnClient", true,
+ new BooleanConfOption("giraph.trackJobProgressOnClient", false,
"Whether to track job progress on client or not");
/** Number of retries for creating the HDFS files */
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 8a97939..ba5d2fa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -25,6 +25,7 @@ import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.scripting.ScriptLoader;
import org.apache.giraph.master.BspServiceMaster;
import org.apache.giraph.master.MasterAggregatorUsage;
@@ -69,6 +70,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -122,6 +124,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/** Superstep stats */
private FinishedSuperstepStats finishedSuperstepStats =
new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
+ /** Job progress tracker */
+ private JobProgressTrackerClient jobProgressTracker;
// Per-Job Metrics
/** Timer for WorkerContext#preApplication() */
@@ -194,6 +198,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
context.setStatus("setup: Beginning worker setup.");
Configuration hadoopConf = context.getConfiguration();
conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
+ initializeJobProgressTracker();
// Write user's graph types (I,V,E,M) back to configuration parameters so
// that they are set for quicker access later. These types are often
// inferred from the Computation class used.
@@ -245,6 +250,26 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
}
/**
+ * Create and connect a client to JobProgressTrackerService,
+ * or no-op implementation if progress shouldn't be tracked or something
+ * goes wrong
+ */
+ private void initializeJobProgressTracker() {
+ if (!conf.trackJobProgressOnClient()) {
+ jobProgressTracker = new JobProgressTrackerClientNoOp();
+ } else {
+ try {
+ jobProgressTracker = new RetryableJobProgressTrackerClient(conf);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("createJobProgressClient: Exception occurred while trying to" +
+ " connect to JobProgressTracker - not reporting progress", e);
+ jobProgressTracker = new JobProgressTrackerClientNoOp();
+ }
+ }
+ jobProgressTracker.mapperStarted();
+ }
+
+ /**
* Perform the work assigned to this compute node for this job run.
* 1) Run checkpoint per frequency policy.
* 2) For every vertex on this mapper, run the compute() function
@@ -485,6 +510,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
return serviceWorker.getWorkerContext();
}
+ public JobProgressTracker getJobProgressTracker() {
+ return jobProgressTracker;
+ }
+
/**
* Copied from JobConf to get the location of this jar. Workaround for
* things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot
@@ -878,6 +907,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
if (LOG.isInfoEnabled()) {
LOG.info("cleanup: Starting for " + getGraphFunctions());
}
+ jobProgressTracker.cleanup();
if (done) {
return;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
new file mode 100644
index 0000000..c302d9a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.job.JobProgressTracker;
+
+import java.io.IOException;
+
+/**
+ * Wrapper around JobProgressTracker which retries to connect and swallows
+ * exceptions so app wouldn't crash if something goes wrong with progress
+ * reports.
+ */
+public interface JobProgressTrackerClient extends JobProgressTracker {
+ /** Close the connections if any */
+ void cleanup() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
new file mode 100644
index 0000000..d75fd42
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.worker.WorkerProgress;
+
+/**
+ * Class to use for JobProgressTracker client when progress shouldn't be
+ * tracked or something goes wrong
+ */
+public class JobProgressTrackerClientNoOp implements JobProgressTrackerClient {
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public void mapperStarted() {
+ }
+
+ @Override
+ public void logInfo(String logLine) {
+ }
+
+ @Override
+ public void logFailure(String reason) {
+ }
+
+ @Override
+ public void updateProgress(WorkerProgress workerProgress) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
new file mode 100644
index 0000000..f15a2e7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.job.JobProgressTracker;
+import org.apache.giraph.worker.WorkerProgress;
+import org.apache.log4j.Logger;
+
+import com.facebook.nifty.client.FramedClientConnector;
+import com.facebook.swift.service.RuntimeTTransportException;
+import com.facebook.swift.service.ThriftClientManager;
+import com.google.common.io.Closeables;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Wrapper around JobProgressTracker which retires to connect and swallows
+ * exceptions so app wouldn't crash if something goes wrong with progress
+ * reports.
+ */
+public class RetryableJobProgressTrackerClient
+ implements JobProgressTrackerClient {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(RetryableJobProgressTrackerClient.class);
+ /** Configuration */
+ private final GiraphConfiguration conf;
+ /** Thrift client manager to use to connect to job progress tracker */
+ private ThriftClientManager clientManager;
+ /** Job progress tracker */
+ private JobProgressTracker jobProgressTracker;
+
+ /**
+ * Constructor
+ *
+ * @param conf Giraph configuration
+ */
+ public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
+ ExecutionException, InterruptedException {
+ this.conf = conf;
+ resetConnection();
+ }
+
+ /**
+ * Try to establish new connection to JobProgressTracker
+ */
+ private void resetConnection() throws ExecutionException,
+ InterruptedException {
+ clientManager = new ThriftClientManager();
+ FramedClientConnector connector =
+ new FramedClientConnector(new InetSocketAddress(
+ JOB_PROGRESS_SERVICE_HOST.get(conf),
+ JOB_PROGRESS_SERVICE_PORT.get(conf)));
+ jobProgressTracker =
+ clientManager.createClient(connector, JobProgressTracker.class).get();
+
+ }
+
+ @Override
+ public synchronized void cleanup() throws IOException {
+ Closeables.close(clientManager, true);
+ try {
+ clientManager.close();
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Exception occurred while trying to close JobProgressTracker", e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void mapperStarted() {
+ executeWithRetry(new Runnable() {
+ @Override
+ public void run() {
+ jobProgressTracker.mapperStarted();
+ }
+ });
+ }
+
+ @Override
+ public synchronized void logInfo(final String logLine) {
+ executeWithRetry(new Runnable() {
+ @Override
+ public void run() {
+ jobProgressTracker.logInfo(logLine);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void logFailure(final String reason) {
+ executeWithRetry(new Runnable() {
+ @Override
+ public void run() {
+ jobProgressTracker.logFailure(reason);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void updateProgress(final WorkerProgress workerProgress) {
+ executeWithRetry(new Runnable() {
+ @Override
+ public void run() {
+ jobProgressTracker.updateProgress(workerProgress);
+ }
+ });
+ }
+
+ /**
+ * Execute Runnable, if disconnected try to connect again and retry
+ *
+ * @param runnable Runnable to execute
+ */
+ private void executeWithRetry(Runnable runnable) {
+ try {
+ runnable.run();
+ } catch (RuntimeTTransportException te) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RuntimeTTransportException occurred while talking to " +
+ "JobProgressTracker server, trying to reconnect", te);
+ }
+ try {
+ try {
+ clientManager.close();
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("");
+ }
+ }
+ resetConnection();
+ runnable.run();
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Exception occurred while talking to " +
+ "JobProgressTracker server, giving up", e);
+ }
+ }
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Exception occurred while talking to " +
+ "JobProgressTracker server, giving up", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 436126b..93aa679 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -239,6 +239,9 @@ public class GiraphJob {
int tryCount = 0;
GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
while (true) {
+ JobProgressTrackerService jobProgressTrackerService =
+ JobProgressTrackerService.createJobProgressServer(conf);
+
tryCount++;
Job submittedJob = new Job(conf, jobName);
if (submittedJob.getJar() == null) {
@@ -253,16 +256,17 @@ public class GiraphJob {
jobObserver.launchingJob(submittedJob);
submittedJob.submit();
if (LOG.isInfoEnabled()) {
- LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL());
+ LOG.info("Tracking URL: " + submittedJob.getTrackingURL());
+ LOG.info(
+ "Waiting for resources... Job will start only when it gets all " +
+ (conf.getMinWorkers() + 1) + " mappers");
}
- HaltApplicationUtils.printHaltInfo(submittedJob, conf);
- JobProgressTracker jobProgressTracker = conf.trackJobProgressOnClient() ?
- new JobProgressTracker(submittedJob, conf) : null;
jobObserver.jobRunning(submittedJob);
+ HaltApplicationUtils.printHaltInfo(submittedJob, conf);
boolean passed = submittedJob.waitForCompletion(verbose);
- if (jobProgressTracker != null) {
- jobProgressTracker.stop();
+ if (jobProgressTrackerService != null) {
+ jobProgressTrackerService.stop(passed);
}
jobObserver.jobFinished(submittedJob, passed);
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
index 6971174..95bc56d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -18,138 +18,53 @@
package org.apache.giraph.job;
-import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.utils.CounterUtils;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.conf.StrConfOption;
import org.apache.giraph.worker.WorkerProgress;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import com.facebook.swift.service.ThriftMethod;
+import com.facebook.swift.service.ThriftService;
/**
- * Class which tracks job's progress on client
+ * Interface for job progress tracker on job client
*/
-public class JobProgressTracker implements Watcher {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(JobProgressTracker.class);
- /** How often to print job's progress */
- private static final int UPDATE_MILLISECONDS = 5 * 1000;
- /** Thread which periodically writes job's progress */
- private Thread writerThread;
- /** ZooKeeperExt */
- private ZooKeeperExt zk;
- /** Whether application is finished */
- private volatile boolean finished = false;
+@ThriftService
+public interface JobProgressTracker {
+ /** Host on which job progress service runs */
+ StrConfOption JOB_PROGRESS_SERVICE_HOST =
+ new StrConfOption("giraph.jobProgressServiceHost", null,
+ "Host on which job progress service runs");
+ /** Port which job progress service uses */
+ IntConfOption JOB_PROGRESS_SERVICE_PORT =
+ new IntConfOption("giraph.jobProgressServicePort", -1,
+ "Port which job progress service uses");
+
+ /** Notify JobProgressTracker that mapper started */
+ @ThriftMethod
+ void mapperStarted();
/**
- * Constructor
+ * Call this when you want to log an info line from any mapper to command line
*
- * @param submittedJob Job to track
- * @param conf Configuration
+ * @param logLine Line to log
*/
- public JobProgressTracker(final Job submittedJob,
- final GiraphConfiguration conf) throws IOException, InterruptedException {
- String zkServer = CounterUtils.waitAndGetCounterNameFromGroup(
- submittedJob, GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP);
- final String basePath = CounterUtils.waitAndGetCounterNameFromGroup(
- submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP);
- // Connect to ZooKeeper
- if (zkServer != null && basePath != null) {
- zk = new ZooKeeperExt(
- zkServer,
- conf.getZooKeeperSessionTimeout(),
- conf.getZookeeperOpsMaxAttempts(),
- conf.getZookeeperOpsRetryWaitMsecs(),
- this,
- new Progressable() {
- @Override
- public void progress() {
- }
- });
- writerThread = new Thread(new Runnable() {
- @Override
- public void run() {
- String workerProgressBasePath = basePath +
- BspService.WORKER_PROGRESSES;
- try {
- while (!finished) {
- if (zk.exists(workerProgressBasePath, false) != null) {
- // Get locations of all worker progresses
- List<String> workerProgressPaths = zk.getChildrenExt(
- workerProgressBasePath, false, false, true);
- List<WorkerProgress> workerProgresses =
- new ArrayList<WorkerProgress>(workerProgressPaths.size());
- // Read all worker progresses
- for (String workerProgressPath : workerProgressPaths) {
- WorkerProgress workerProgress = new WorkerProgress();
- byte[] zkData = zk.getData(workerProgressPath, false, null);
- WritableUtils.readFieldsFromByteArray(zkData, workerProgress);
- workerProgresses.add(workerProgress);
- }
- // Combine and log
- CombinedWorkerProgress combinedWorkerProgress =
- new CombinedWorkerProgress(workerProgresses);
- if (LOG.isInfoEnabled()) {
- LOG.info(combinedWorkerProgress.toString());
- }
- // Check if application is done
- if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
- break;
- }
- }
- Thread.sleep(UPDATE_MILLISECONDS);
- }
- // CHECKSTYLE: stop IllegalCatchCheck
- } catch (Exception e) {
- // CHECKSTYLE: resume IllegalCatchCheck
- if (LOG.isInfoEnabled()) {
- LOG.info("run: Exception occurred", e);
- }
- } finally {
- try {
- // Create a node so master knows we stopped communicating with
- // ZooKeeper and it's safe to cleanup
- zk.createExt(
- basePath + BspService.CLEANED_UP_DIR + "/client",
- null,
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- zk.close();
- // CHECKSTYLE: stop IllegalCatchCheck
- } catch (Exception e) {
- // CHECKSTYLE: resume IllegalCatchCheck
- if (LOG.isInfoEnabled()) {
- LOG.info("run: Exception occurred", e);
- }
- }
- }
- }
- });
- writerThread.start();
- }
- }
+ @ThriftMethod
+ void logInfo(String logLine);
/**
- * Stop the thread which logs application progress
+ * Notify that job is failing
+ *
+ * @param reason Reason for failure
*/
- public void stop() {
- finished = true;
- }
+ @ThriftMethod
+ void logFailure(String reason);
- @Override
- public void process(WatchedEvent event) {
- }
+ /**
+ * Workers should call this method to update their progress
+ *
+ * @param workerProgress Progress of the worker
+ */
+ @ThriftMethod
+ void updateProgress(WorkerProgress workerProgress);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
new file mode 100644
index 0000000..3a896e2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.worker.WorkerProgress;
+import org.apache.log4j.Logger;
+
+import com.facebook.swift.codec.ThriftCodecManager;
+import com.facebook.swift.service.ThriftEventHandler;
+import com.facebook.swift.service.ThriftServer;
+import com.facebook.swift.service.ThriftServerConfig;
+import com.facebook.swift.service.ThriftServiceProcessor;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of job progress tracker service on job client
+ */
+public class JobProgressTrackerService implements JobProgressTracker {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(JobProgressTrackerService.class);
+ /** How often to print job's progress */
+ private static final int UPDATE_MILLISECONDS = 10 * 1000;
+
+ /** Configuration */
+ private final GiraphConfiguration conf;
+ /** Thread which periodically writes job's progress */
+ private Thread writerThread;
+ /** Whether application is finished */
+ private volatile boolean finished = false;
+ /** Server which uses this service */
+ private ThriftServer server;
+ /** Number of mappers which the job got */
+ private int mappersStarted;
+ /** Last time number of mappers started was logged */
+ private long lastTimeMappersStartedLogged;
+ /** Map of worker progresses */
+ private final Map<Integer, WorkerProgress> workerProgresses =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public JobProgressTrackerService(GiraphConfiguration conf) {
+ this.conf = conf;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Waiting for job to start... (this may take a minute)");
+ }
+ startWriterThread();
+ }
+
+ /**
+ * Start the thread which writes progress periodically
+ */
+ private void startWriterThread() {
+ writerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!finished) {
+ if (mappersStarted == conf.getMaxWorkers() + 1 &&
+ !workerProgresses.isEmpty()) {
+ // Combine and log
+ CombinedWorkerProgress combinedWorkerProgress =
+ new CombinedWorkerProgress(workerProgresses.values());
+ if (LOG.isInfoEnabled()) {
+ LOG.info(combinedWorkerProgress.toString());
+ }
+ // Check if application is done
+ if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
+ break;
+ }
+ }
+ try {
+ Thread.sleep(UPDATE_MILLISECONDS);
+ } catch (InterruptedException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Progress thread interrupted");
+ }
+ break;
+ }
+ }
+ }
+ });
+ writerThread.start();
+ }
+
+ @Override
+ public synchronized void mapperStarted() {
+ mappersStarted++;
+ if (LOG.isInfoEnabled()) {
+ if (mappersStarted == conf.getMaxWorkers() + 1) {
+ LOG.info("Got all " + mappersStarted + " mappers");
+ } else {
+ if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
+ UPDATE_MILLISECONDS) {
+ lastTimeMappersStartedLogged = System.currentTimeMillis();
+ LOG.info("Got " + mappersStarted + " but needs " +
+ (conf.getMaxWorkers() + 1) + " mappers");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void logInfo(String logLine) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(logLine);
+ }
+ }
+
+ @Override
+ public void logFailure(String reason) {
+ LOG.fatal(reason);
+ finished = true;
+ writerThread.interrupt();
+ }
+
+ @Override
+ public void updateProgress(WorkerProgress workerProgress) {
+ workerProgresses.put(workerProgress.getTaskId(), workerProgress);
+ }
+
+ /**
+ * Stop the thread which logs application progress and server
+ *
+ * @param succeeded Whether job succeeded or not
+ */
+ public void stop(boolean succeeded) {
+ finished = true;
+ writerThread.interrupt();
+ server.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
+ ", cleaning up...");
+ }
+ }
+
+ /**
+ * Create job progress server on job client, and update configuration with
+ * its hostname and port so mappers would know what to connect to. Returns
+ * null if progress shouldn't be tracked
+ *
+ * @param conf Configuration
+ * @return JobProgressTrackerService
+ */
+ public static JobProgressTrackerService createJobProgressServer(
+ GiraphConfiguration conf) {
+ if (!conf.trackJobProgressOnClient()) {
+ return null;
+ }
+ try {
+ JobProgressTrackerService service = new JobProgressTrackerService(conf);
+ ThriftServiceProcessor processor =
+ new ThriftServiceProcessor(new ThriftCodecManager(),
+ new ArrayList<ThriftEventHandler>(), service);
+ service.server = new ThriftServer(processor, new ThriftServerConfig());
+ service.server.start();
+ JOB_PROGRESS_SERVICE_HOST.set(conf,
+ InetAddress.getLocalHost().getHostName());
+ JOB_PROGRESS_SERVICE_PORT.set(conf, service.server.getPort());
+ return service;
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ LOG.warn("Exception occurred while trying to create " +
+ "JobProgressTrackerService - not using progress reporting", e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 671df23..efa5b87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -307,6 +307,7 @@ public class BspServiceMaster<I extends WritableComparable,
* @param reason The reason the job failed
*/
private void setJobStateFailed(String reason) {
+ getGraphTaskManager().getJobProgressTracker().logFailure(reason);
setJobState(ApplicationState.FAILED, -1, -1, false);
failJob(new IllegalStateException(reason));
}
@@ -644,7 +645,8 @@ public class BspServiceMaster<I extends WritableComparable,
"check input of " + inputFormat.getClass().getName() + "!");
getContext().setStatus("Failing job due to 0 input splits, " +
"check input of " + inputFormat.getClass().getName() + "!");
- setJobStateFailed("0 input splits");
+ setJobStateFailed("Please check your input tables - partitions which " +
+ "you specified are missing. Failing the job!!!");
}
if (minSplitCountHint > splitList.size()) {
LOG.warn(logPrefix + ": Number of inputSplits=" +
@@ -885,7 +887,7 @@ public class BspServiceMaster<I extends WritableComparable,
getContext());
aggregatorHandler.initialize(this);
masterCompute = getConfiguration().createMasterCompute();
- masterCompute.setMasterAggregatorUsage(aggregatorHandler);
+ masterCompute.setMasterService(this);
masterInfo = new MasterInfo();
masterServer =
@@ -1790,10 +1792,6 @@ public class BspServiceMaster<I extends WritableComparable,
GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
maxTasks *= 2;
}
- if (getConfiguration().trackJobProgressOnClient()) {
- // For job client
- maxTasks++;
- }
List<String> cleanedUpChildrenList = null;
while (true) {
try {
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index c2a1f9a..552cca9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -19,6 +19,7 @@
package org.apache.giraph.master;
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.graph.Computation;
@@ -46,7 +47,7 @@ public abstract class MasterCompute
/** If true, do not do anymore computation on this vertex. */
private boolean halt = false;
/** Master aggregator usage */
- private MasterAggregatorUsage masterAggregatorUsage;
+ private CentralizedServiceMaster serviceMaster;
/** Graph state */
private GraphState graphState;
/**
@@ -192,14 +193,16 @@ public abstract class MasterCompute
public final <A extends Writable> boolean registerAggregator(
String name, Class<? extends Aggregator<A>> aggregatorClass)
throws InstantiationException, IllegalAccessException {
- return masterAggregatorUsage.registerAggregator(name, aggregatorClass);
+ return serviceMaster.getAggregatorHandler().registerAggregator(
+ name, aggregatorClass);
}
@Override
public final <A extends Writable> boolean registerAggregator(
String name, WritableFactory<? extends Aggregator<A>> aggregator)
throws InstantiationException, IllegalAccessException {
- return masterAggregatorUsage.registerAggregator(name, aggregator);
+ return serviceMaster.getAggregatorHandler().registerAggregator(
+ name, aggregator);
}
@Override
@@ -207,28 +210,37 @@ public abstract class MasterCompute
String name,
Class<? extends Aggregator<A>> aggregatorClass) throws
InstantiationException, IllegalAccessException {
- return masterAggregatorUsage.registerPersistentAggregator(
+ return serviceMaster.getAggregatorHandler().registerPersistentAggregator(
name, aggregatorClass);
}
@Override
public final <A extends Writable> A getAggregatedValue(String name) {
- return masterAggregatorUsage.<A>getAggregatedValue(name);
+ return serviceMaster.getAggregatorHandler().<A>getAggregatedValue(name);
}
@Override
public final <A extends Writable> void setAggregatedValue(
String name, A value) {
- masterAggregatorUsage.setAggregatedValue(name, value);
+ serviceMaster.getAggregatorHandler().setAggregatedValue(name, value);
+ }
+
+ /**
+ * Call this to log a line to command line of the job. Use in moderation -
+ * it's a synchronous call to Job client
+ *
+ * @param line Line to print
+ */
+ public void logToCommandLine(String line) {
+ serviceMaster.getJobProgressTracker().logInfo(line);
}
final void setGraphState(GraphState graphState) {
this.graphState = graphState;
}
- final void setMasterAggregatorUsage(MasterAggregatorUsage
- masterAggregatorUsage) {
- this.masterAggregatorUsage = masterAggregatorUsage;
+ final void setMasterService(CentralizedServiceMaster serviceMaster) {
+ this.serviceMaster = serviceMaster;
}
final void setSuperstepClasses(SuperstepClasses superstepClasses) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
index 2b30739..f78b1a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
@@ -150,7 +150,7 @@ public class ScriptLoader {
*
* @param conf Configuration
*/
- public static void loadScripts(Configuration conf) {
+ public static void loadScripts(Configuration conf) throws IOException {
List<DeployedScript> deployedScripts = getScriptsToLoad(conf);
if (deployedScripts == null) {
return;
@@ -167,7 +167,7 @@ public class ScriptLoader {
* @param deployedScript the deployed script
*/
public static void loadScript(Configuration conf,
- DeployedScript deployedScript) {
+ DeployedScript deployedScript) throws IOException {
InputStream stream = openScriptInputStream(conf, deployedScript);
switch (deployedScript.getLanguage()) {
case JYTHON:
@@ -180,7 +180,7 @@ public class ScriptLoader {
}
LOADED_SCRIPTS.add(deployedScript);
- Closeables.closeQuietly(stream);
+ Closeables.close(stream, true);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
index 6e8b1e3..0f9a08a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
@@ -129,7 +129,7 @@ public class FileUtils {
writer.write('\n');
}
} finally {
- Closeables.closeQuietly(writer);
+ Closeables.close(writer, true);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 447bb6f..120678f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -229,7 +229,8 @@ public class BspServiceWorker<I extends WritableComparable,
WorkerProgress.get().setTaskId(getTaskPartition());
workerProgressWriter = conf.trackJobProgressOnClient() ?
- new WorkerProgressWriter(myProgressPath, getZkExt()) : null;
+ new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
+ null;
GiraphMetrics.get().addSuperstepResetObserver(this);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index aca9944..7a55d56 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -206,6 +206,16 @@ public abstract class WorkerContext
return workerAggregatorUsage.<A>getAggregatedValue(name);
}
+ /**
+ * Call this to log a line to command line of the job. Use in moderation -
+ * it's a synchronous call to Job client
+ *
+ * @param line Line to print
+ */
+ public void logToCommandLine(String line) {
+ serviceWorker.getJobProgressTracker().logInfo(line);
+ }
+
@Override
public void write(DataOutput dataOutput) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
index 1a2a6ee..24f791b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
@@ -19,17 +19,9 @@
package org.apache.giraph.worker;
import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
import javax.annotation.concurrent.ThreadSafe;
@@ -38,9 +30,8 @@ import javax.annotation.concurrent.ThreadSafe;
* ZooKeeper with {@link WorkerProgressWriter}.
*/
@ThreadSafe
-public class WorkerProgress implements Writable {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(WorkerProgress.class);
+@ThriftStruct
+public class WorkerProgress {
/** Singleton instance for everyone to use */
private static final WorkerProgress INSTANCE = new WorkerProgress();
@@ -99,45 +90,6 @@ public class WorkerProgress implements Writable {
}
/**
- * Write worker's progress to znode
- *
- * @param zk ZooKeeperExt
- * @param myProgressPath Path to write the progress to
- */
- public static void writeToZnode(ZooKeeperExt zk, String myProgressPath) {
- byte[] byteArray = WritableUtils.writeToByteArray(get());
- try {
- zk.createOrSetExt(myProgressPath,
- byteArray,
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true,
- -1);
- } catch (KeeperException | InterruptedException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("writeToZnode: " + e.getClass().getName() +
- " exception occurred", e);
- }
- }
- }
-
- public synchronized boolean isLoadingVerticesDone() {
- return loadingVerticesDone;
- }
-
- public synchronized boolean isLoadingEdgesDone() {
- return loadingEdgesDone;
- }
-
- public synchronized boolean isComputationDone() {
- return computationDone;
- }
-
- public synchronized boolean isStoringDone() {
- return storingDone;
- }
-
- /**
* Add number of vertices loaded
*
* @param verticesLoaded How many vertices were loaded since the last
@@ -188,8 +140,8 @@ public class WorkerProgress implements Writable {
/**
* Notify this class that next computation superstep is starting
*
- * @param superstep Superstep which is starting
- * @param verticesToCompute How many vertices are there to compute
+ * @param superstep Superstep which is starting
+ * @param verticesToCompute How many vertices are there to compute
* @param partitionsToCompute How many partitions are there to compute
*/
public synchronized void startSuperstep(long superstep,
@@ -221,7 +173,7 @@ public class WorkerProgress implements Writable {
/**
* Notify this class that worker is starting to store data
*
- * @param verticesToStore How many vertices should be stored
+ * @param verticesToStore How many vertices should be stored
* @param partitionsToStore How many partitions should be stored
*/
public synchronized void startStoring(long verticesToStore,
@@ -260,10 +212,6 @@ public class WorkerProgress implements Writable {
storingDone = true;
}
- public synchronized void setTaskId(int taskId) {
- this.taskId = taskId;
- }
-
/**
* Update memory info
*/
@@ -271,58 +219,101 @@ public class WorkerProgress implements Writable {
freeMemoryMB = MemoryUtils.freeMemoryMB();
}
+ @ThriftField(1)
public synchronized long getCurrentSuperstep() {
return currentSuperstep;
}
+ @ThriftField(2)
public synchronized long getVerticesLoaded() {
return verticesLoaded;
}
+ @ThriftField(3)
public synchronized int getVertexInputSplitsLoaded() {
return vertexInputSplitsLoaded;
}
+ @ThriftField(4)
+ public synchronized boolean isLoadingVerticesDone() {
+ return loadingVerticesDone;
+ }
+
+ @ThriftField(5)
public synchronized long getEdgesLoaded() {
return edgesLoaded;
}
+ @ThriftField(6)
public synchronized int getEdgeInputSplitsLoaded() {
return edgeInputSplitsLoaded;
}
+ @ThriftField(7)
+ public synchronized boolean isLoadingEdgesDone() {
+ return loadingEdgesDone;
+ }
+
+ @ThriftField(8)
public synchronized long getVerticesToCompute() {
return verticesToCompute;
}
+ @ThriftField(9)
public synchronized long getVerticesComputed() {
return verticesComputed;
}
+ @ThriftField(10)
public synchronized int getPartitionsToCompute() {
return partitionsToCompute;
}
+ @ThriftField(11)
public synchronized int getPartitionsComputed() {
return partitionsComputed;
}
+ @ThriftField(12)
+ public synchronized boolean isComputationDone() {
+ return computationDone;
+ }
+
+ @ThriftField(13)
public synchronized long getVerticesToStore() {
return verticesToStore;
}
+ @ThriftField(14)
public synchronized long getVerticesStored() {
return verticesStored;
}
+ @ThriftField(15)
public synchronized int getPartitionsToStore() {
return partitionsToStore;
}
+ @ThriftField(16)
public synchronized int getPartitionsStored() {
return partitionsStored;
}
+ @ThriftField(17)
+ public synchronized boolean isStoringDone() {
+ return storingDone;
+ }
+
+ @ThriftField(18)
+ public synchronized int getTaskId() {
+ return taskId;
+ }
+
+ @ThriftField(19)
+ public synchronized double getFreeMemoryMB() {
+ return freeMemoryMB;
+ }
+
public synchronized boolean isInputSuperstep() {
return currentSuperstep == -1;
}
@@ -335,69 +326,98 @@ public class WorkerProgress implements Writable {
return currentSuperstep == Long.MAX_VALUE;
}
- public synchronized int getTaskId() {
- return taskId;
+ @ThriftField
+ public void setCurrentSuperstep(long currentSuperstep) {
+ this.currentSuperstep = currentSuperstep;
}
- public synchronized double getFreeMemoryMB() {
- return freeMemoryMB;
+ @ThriftField
+ public void setVerticesLoaded(long verticesLoaded) {
+ this.verticesLoaded = verticesLoaded;
}
- @Override
- public synchronized void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeLong(currentSuperstep);
+ @ThriftField
+ public void setVertexInputSplitsLoaded(int vertexInputSplitsLoaded) {
+ this.vertexInputSplitsLoaded = vertexInputSplitsLoaded;
+ }
- dataOutput.writeLong(verticesLoaded);
- dataOutput.writeInt(vertexInputSplitsLoaded);
- dataOutput.writeBoolean(loadingVerticesDone);
- dataOutput.writeLong(edgesLoaded);
- dataOutput.writeInt(edgeInputSplitsLoaded);
- dataOutput.writeBoolean(loadingEdgesDone);
+ @ThriftField
+ public void setLoadingVerticesDone(boolean loadingVerticesDone) {
+ this.loadingVerticesDone = loadingVerticesDone;
+ }
- dataOutput.writeLong(verticesToCompute);
- dataOutput.writeLong(verticesComputed);
- dataOutput.writeInt(partitionsToCompute);
- dataOutput.writeInt(partitionsComputed);
+ @ThriftField
+ public void setEdgesLoaded(long edgesLoaded) {
+ this.edgesLoaded = edgesLoaded;
+ }
- dataOutput.writeBoolean(computationDone);
+ @ThriftField
+ public void setEdgeInputSplitsLoaded(int edgeInputSplitsLoaded) {
+ this.edgeInputSplitsLoaded = edgeInputSplitsLoaded;
+ }
- dataOutput.writeLong(verticesToStore);
- dataOutput.writeLong(verticesStored);
- dataOutput.writeInt(partitionsToStore);
- dataOutput.writeInt(partitionsStored);
- dataOutput.writeBoolean(storingDone);
+ @ThriftField
+ public void setLoadingEdgesDone(boolean loadingEdgesDone) {
+ this.loadingEdgesDone = loadingEdgesDone;
+ }
- dataOutput.writeInt(taskId);
+ @ThriftField
+ public void setVerticesToCompute(long verticesToCompute) {
+ this.verticesToCompute = verticesToCompute;
+ }
- dataOutput.writeDouble(freeMemoryMB);
+ @ThriftField
+ public void setVerticesComputed(long verticesComputed) {
+ this.verticesComputed = verticesComputed;
}
- @Override
- public synchronized void readFields(DataInput dataInput) throws IOException {
- currentSuperstep = dataInput.readLong();
+ @ThriftField
+ public void setPartitionsToCompute(int partitionsToCompute) {
+ this.partitionsToCompute = partitionsToCompute;
+ }
- verticesLoaded = dataInput.readLong();
- vertexInputSplitsLoaded = dataInput.readInt();
- loadingVerticesDone = dataInput.readBoolean();
- edgesLoaded = dataInput.readLong();
- edgeInputSplitsLoaded = dataInput.readInt();
- loadingEdgesDone = dataInput.readBoolean();
+ @ThriftField
+ public void setPartitionsComputed(int partitionsComputed) {
+ this.partitionsComputed = partitionsComputed;
+ }
- verticesToCompute = dataInput.readLong();
- verticesComputed = dataInput.readLong();
- partitionsToCompute = dataInput.readInt();
- partitionsComputed = dataInput.readInt();
+ @ThriftField
+ public void setComputationDone(boolean computationDone) {
+ this.computationDone = computationDone;
+ }
- computationDone = dataInput.readBoolean();
+ @ThriftField
+ public void setVerticesToStore(long verticesToStore) {
+ this.verticesToStore = verticesToStore;
+ }
- verticesToStore = dataInput.readLong();
- verticesStored = dataInput.readLong();
- partitionsToStore = dataInput.readInt();
- partitionsStored = dataInput.readInt();
- storingDone = dataInput.readBoolean();
+ @ThriftField
+ public void setVerticesStored(long verticesStored) {
+ this.verticesStored = verticesStored;
+ }
- taskId = dataInput.readInt();
+ @ThriftField
+ public void setPartitionsToStore(int partitionsToStore) {
+ this.partitionsToStore = partitionsToStore;
+ }
+
+ @ThriftField
+ public void setPartitionsStored(int partitionsStored) {
+ this.partitionsStored = partitionsStored;
+ }
+
+ @ThriftField
+ public void setStoringDone(boolean storingDone) {
+ this.storingDone = storingDone;
+ }
- freeMemoryMB = dataInput.readDouble();
+ @ThriftField
+ public void setFreeMemoryMB(double freeMemoryMB) {
+ this.freeMemoryMB = freeMemoryMB;
+ }
+
+ @ThriftField
+ public synchronized void setTaskId(int taskId) {
+ this.taskId = taskId;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
index 4ff5bb1..dae9963 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
@@ -18,7 +18,7 @@
package org.apache.giraph.worker;
-import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.job.JobProgressTracker;
import org.apache.log4j.Logger;
/**
@@ -31,33 +31,27 @@ public class WorkerProgressWriter {
/** How often to update worker's progress */
private static final int WRITE_UPDATE_PERIOD_MILLISECONDS = 10 * 1000;
+ /** Job progress tracker */
+ private final JobProgressTracker jobProgressTracker;
/** Thread which writes worker's progress */
private final Thread writerThread;
/** Whether worker finished application */
private volatile boolean finished = false;
- /** Path where this worker's progress should be stored */
- private final String myProgressPath;
- /** ZooKeeperExt */
- private final ZooKeeperExt zk;
/**
* Constructor, starts separate thread to periodically update worker's
* progress
*
- * @param myProgressPath Path where this worker's progress should be stored
- * @param zk ZooKeeperExt
+ * @param jobProgressTracker JobProgressTracker to report job progress to
*/
- public WorkerProgressWriter(String myProgressPath, ZooKeeperExt zk) {
- this.myProgressPath = myProgressPath;
- this.zk = zk;
+ public WorkerProgressWriter(JobProgressTracker jobProgressTracker) {
+ this.jobProgressTracker = jobProgressTracker;
writerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (!finished) {
- WorkerProgress.get().updateMemory();
- WorkerProgress.writeToZnode(WorkerProgressWriter.this.zk,
- WorkerProgressWriter.this.myProgressPath);
+ updateAndSendProgress();
double factor = 1 + Math.random();
Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor));
}
@@ -73,12 +67,20 @@ public class WorkerProgressWriter {
}
/**
+ * Update worker progress and send it
+ */
+ private void updateAndSendProgress() {
+ WorkerProgress.get().updateMemory();
+ jobProgressTracker.updateProgress(WorkerProgress.get());
+ }
+
+ /**
* Stop the thread which writes worker's progress
*/
public void stop() throws InterruptedException {
finished = true;
writerThread.interrupt();
writerThread.join();
- WorkerProgress.writeToZnode(zk, myProgressPath);
+ updateAndSendProgress();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index 73ef97b..b5816d7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -619,13 +619,13 @@ public class ZooKeeperManager {
myidWriter = new FileWriter(zkDir + "/myid");
myidWriter.write(i + "\n");
} finally {
- Closeables.closeQuietly(myidWriter);
+ Closeables.close(myidWriter, true);
}
}
}
}
} finally {
- Closeables.closeQuietly(writer);
+ Closeables.close(writer, true);
}
} catch (IOException e) {
throw new IllegalStateException(
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index dd2369a..b372ab7 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -268,8 +268,8 @@ public class BspCase implements Watcher {
numResults++;
}
} finally {
- Closeables.closeQuietly(in);
- Closeables.closeQuietly(reader);
+ Closeables.close(in, true);
+ Closeables.close(reader, true);
}
}
return numResults;
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-examples/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml
index f8304a1..f95edcb 100644
--- a/giraph-examples/pom.xml
+++ b/giraph-examples/pom.xml
@@ -226,14 +226,6 @@ under the License.
</plugin>
</plugins>
</build>
- <dependencies>
- <dependency>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${dep.oldnetty.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
</profile>
<profile>
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 5612e5f..488e1ea 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -461,8 +461,8 @@ public class
assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep));
} finally {
- Closeables.closeQuietly(in);
- Closeables.closeQuietly(reader);
+ Closeables.close(in, true);
+ Closeables.close(reader, true);
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
index 7ae8bc3..517901a 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
@@ -176,7 +176,7 @@ public class HiveJythonUtils {
jythonJob = parseJythonStreams(interpreter, streams);
} finally {
for (InputStream stream : streams) {
- Closeables.closeQuietly(stream);
+ Closeables.close(stream, true);
}
}
return jythonJob;
http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b4d78ae..2e3eb63 100644
--- a/pom.xml
+++ b/pom.xml
@@ -289,6 +289,7 @@ under the License.
<dep.commons-logging.version>1.1.1</dep.commons-logging.version>
<dep.commons-io.version>2.1</dep.commons-io.version>
<dep.commons-net.version>3.1</dep.commons-net.version>
+ <dep.facebook-swift.version>0.13.1</dep.facebook-swift.version>
<dep.fasterxml-jackson.version>2.1.2</dep.fasterxml-jackson.version>
<dep.fastutil.version>6.5.4</dep.fastutil.version>
<dep.google.findbugs.version>2.0.2</dep.google.findbugs.version>
@@ -309,7 +310,7 @@ under the License.
<!-- note: old version of netty is required by hadoop_facebook for tests to succeed -->
<dep.oldnetty.version>3.2.2.Final</dep.oldnetty.version>
<dep.netty.version>4.0.14.Final</dep.netty.version>
- <dep.paranamer.version>2.3</dep.paranamer.version>
+ <dep.paranamer.version>2.5.2</dep.paranamer.version>
<dep.slf4j.version>1.7.5</dep.slf4j.version>
<dep.tinkerpop.rexter.version>2.4.0</dep.tinkerpop.rexter.version>
<dep.typetools.version>0.2.1</dep.typetools.version>
@@ -1389,6 +1390,14 @@ under the License.
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.airlift</groupId>
+ <artifactId>units</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -1404,6 +1413,14 @@ under the License.
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.airlift</groupId>
+ <artifactId>units</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -1594,6 +1611,60 @@ under the License.
<version>${dep.yourkit-api.version}</version>
</dependency>
<dependency>
+ <groupId>com.facebook.nifty</groupId>
+ <artifactId>nifty-client</artifactId>
+ <version>${dep.facebook-swift.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.swift</groupId>
+ <artifactId>swift-codec</artifactId>
+ <version>${dep.facebook-swift.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-all</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.swift</groupId>
+ <artifactId>swift-annotations</artifactId>
+ <version>${dep.facebook-swift.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.swift</groupId>
+ <artifactId>swift-service</artifactId>
+ <version>${dep.facebook-swift.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${dep.log4j.version}</version>
@@ -2086,6 +2157,14 @@ under the License.
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.airlift</groupId>
+ <artifactId>units</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>