You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:13 UTC

[30/47] git commit: updated refs/heads/release-1.1 to 4c139ee

GIRAPH-944: Improve job tracking on command line


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4485e563
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4485e563
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4485e563

Branch: refs/heads/release-1.1
Commit: 4485e563a6582afb1c848ea80888fdad50ada516
Parents: de0efb0
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Aug 26 11:35:14 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Aug 26 14:42:52 2014 -0700

----------------------------------------------------------------------
 giraph-core/pom.xml                             |  24 +-
 .../java/org/apache/giraph/bsp/BspService.java  |  12 +-
 .../apache/giraph/bsp/CentralizedService.java   |   9 +
 .../org/apache/giraph/conf/GiraphConstants.java |   2 +-
 .../apache/giraph/graph/GraphTaskManager.java   |  30 +++
 .../giraph/graph/JobProgressTrackerClient.java  |  33 +++
 .../graph/JobProgressTrackerClientNoOp.java     |  47 ++++
 .../RetryableJobProgressTrackerClient.java      | 175 ++++++++++++++
 .../java/org/apache/giraph/job/GiraphJob.java   |  16 +-
 .../apache/giraph/job/JobProgressTracker.java   | 155 +++----------
 .../giraph/job/JobProgressTrackerService.java   | 193 +++++++++++++++
 .../apache/giraph/master/BspServiceMaster.java  |  10 +-
 .../org/apache/giraph/master/MasterCompute.java |  30 ++-
 .../apache/giraph/scripting/ScriptLoader.java   |   6 +-
 .../java/org/apache/giraph/utils/FileUtils.java |   2 +-
 .../apache/giraph/worker/BspServiceWorker.java  |   3 +-
 .../org/apache/giraph/worker/WorkerContext.java |  10 +
 .../apache/giraph/worker/WorkerProgress.java    | 232 ++++++++++---------
 .../giraph/worker/WorkerProgressWriter.java     |  30 +--
 .../org/apache/giraph/zk/ZooKeeperManager.java  |   4 +-
 .../test/java/org/apache/giraph/BspCase.java    |   4 +-
 giraph-examples/pom.xml                         |   8 -
 .../java/org/apache/giraph/TestBspBasic.java    |   4 +-
 .../giraph/hive/jython/HiveJythonUtils.java     |   2 +-
 pom.xml                                         |  81 ++++++-
 25 files changed, 825 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index b66ba1d..23f6666 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -289,14 +289,6 @@ under the License.
           </plugin>
         </plugins>
       </build>
-      <dependencies>
-        <dependency>
-          <groupId>org.jboss.netty</groupId>
-          <artifactId>netty</artifactId>
-          <version>${dep.oldnetty.version}</version>
-          <scope>test</scope>
-        </dependency>
-      </dependencies>
     </profile>
 
     <profile>
@@ -467,6 +459,22 @@ under the License.
   <dependencies>
     <!-- compile dependencies. sorted lexicographically. -->
     <dependency>
+      <groupId>com.facebook.nifty</groupId>
+      <artifactId>nifty-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.facebook.swift</groupId>
+      <artifactId>swift-annotations</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.facebook.swift</groupId>
+      <artifactId>swift-codec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.facebook.swift</groupId>
+      <artifactId>swift-service</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.facebook.thirdparty.yourkit-api</groupId>
       <artifactId>yjp-controller-api-redist</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index c418a89..2a50489 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -23,6 +23,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.InputSplitPaths;
+import org.apache.giraph.job.JobProgressTracker;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.giraph.worker.WorkerInfo;
@@ -161,8 +162,6 @@ public abstract class BspService<I extends WritableComparable,
       "/_partitionExchangeDir";
   /** Denotes that the superstep is done */
   public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
-  /** Stores progress info for workers */
-  public static final String WORKER_PROGRESSES = "/_workerProgresses";
   /** Denotes that computation should be halted */
   public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
   /** User sets this flag to checkpoint and stop the job */
@@ -241,8 +240,6 @@ public abstract class BspService<I extends WritableComparable,
   protected final String savedCheckpointBasePath;
   /** Path to the master election path */
   protected final String masterElectionPath;
-  /** Stores progress info of this worker */
-  protected final String myProgressPath;
   /** If this path exists computation will be halted */
   protected final String haltComputationPath;
   /** Private ZooKeeper instance that implements the service */
@@ -363,7 +360,6 @@ public abstract class BspService<I extends WritableComparable,
         getCheckpointBasePath(getConfiguration(), getJobId());
 
     masterElectionPath = basePath + MASTER_ELECTION_DIR;
-    myProgressPath = basePath + WORKER_PROGRESSES + "/" + taskPartition;
     String serverPortList = conf.getZookeeperList();
     haltComputationPath = basePath + HALT_COMPUTATION_NODE;
     getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
@@ -404,7 +400,6 @@ public abstract class BspService<I extends WritableComparable,
           "BspService: Invalid superstep to restart - " +
               restartedSuperstep);
     }
-
   }
 
   /**
@@ -1258,6 +1253,11 @@ public abstract class BspService<I extends WritableComparable,
     return lastCheckpointedSuperstep;
   }
 
+  @Override
+  public JobProgressTracker getJobProgressTracker() {
+    return getGraphTaskManager().getJobProgressTracker();
+  }
+
   /**
    * Only get the finalized checkpoint files
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
index 560f1fb..0cadfb7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -19,6 +19,8 @@
 package org.apache.giraph.bsp;
 
 import java.util.List;
+
+import org.apache.giraph.job.JobProgressTracker;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -56,4 +58,11 @@ public interface CentralizedService<I extends WritableComparable,
    * @return List of workers
    */
   List<WorkerInfo> getWorkerInfoList();
+
+  /**
+   * Get JobProgressTracker to report progress to
+   *
+   * @return JobProgressTrackerClient
+   */
+  JobProgressTracker getJobProgressTracker();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index da0a8db..d1fdf57 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -1119,7 +1119,7 @@ public interface GiraphConstants {
 
   /** Whether to track job progress on client or not */
   BooleanConfOption TRACK_JOB_PROGRESS_ON_CLIENT =
-      new BooleanConfOption("giraph.trackJobProgressOnClient", true,
+      new BooleanConfOption("giraph.trackJobProgressOnClient", false,
           "Whether to track job progress on client or not");
 
   /** Number of retries for creating the HDFS files */

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 8a97939..ba5d2fa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -25,6 +25,7 @@ import org.apache.giraph.bsp.CheckpointStatus;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.job.JobProgressTracker;
 import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.master.BspServiceMaster;
 import org.apache.giraph.master.MasterAggregatorUsage;
@@ -69,6 +70,7 @@ import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -122,6 +124,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   /** Superstep stats */
   private FinishedSuperstepStats finishedSuperstepStats =
       new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
+  /** Job progress tracker */
+  private JobProgressTrackerClient jobProgressTracker;
 
   // Per-Job Metrics
   /** Timer for WorkerContext#preApplication() */
@@ -194,6 +198,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     context.setStatus("setup: Beginning worker setup.");
     Configuration hadoopConf = context.getConfiguration();
     conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
+    initializeJobProgressTracker();
     // Write user's graph types (I,V,E,M) back to configuration parameters so
     // that they are set for quicker access later. These types are often
     // inferred from the Computation class used.
@@ -245,6 +250,26 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   }
 
   /**
+   * Create and connect a client to JobProgressTrackerService,
+   * or no-op implementation if progress shouldn't be tracked or something
+   * goes wrong
+   */
+  private void initializeJobProgressTracker() {
+    if (!conf.trackJobProgressOnClient()) {
+      jobProgressTracker = new JobProgressTrackerClientNoOp();
+    } else {
+      try {
+        jobProgressTracker = new RetryableJobProgressTrackerClient(conf);
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.warn("createJobProgressClient: Exception occurred while trying to" +
+            " connect to JobProgressTracker - not reporting progress", e);
+        jobProgressTracker = new JobProgressTrackerClientNoOp();
+      }
+    }
+    jobProgressTracker.mapperStarted();
+  }
+
+  /**
   * Perform the work assigned to this compute node for this job run.
   * 1) Run checkpoint per frequency policy.
   * 2) For every vertex on this mapper, run the compute() function
@@ -485,6 +510,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     return serviceWorker.getWorkerContext();
   }
 
+  public JobProgressTracker getJobProgressTracker() {
+    return jobProgressTracker;
+  }
+
   /**
    * Copied from JobConf to get the location of this jar.  Workaround for
    * things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot
@@ -878,6 +907,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     if (LOG.isInfoEnabled()) {
       LOG.info("cleanup: Starting for " + getGraphFunctions());
     }
+    jobProgressTracker.cleanup();
     if (done) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
new file mode 100644
index 0000000..c302d9a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.job.JobProgressTracker;
+
+import java.io.IOException;
+
+/**
+ * Wrapper around JobProgressTracker which retries to connect and swallows
+ * exceptions so app wouldn't crash if something goes wrong with progress
+ * reports.
+ */
+public interface JobProgressTrackerClient extends JobProgressTracker {
+  /** Close the connections if any */
+  void cleanup() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
new file mode 100644
index 0000000..d75fd42
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.worker.WorkerProgress;
+
+/**
+ * Class to use for JobProgressTracker client when progress shouldn't be
+ * tracked or something goes wrong
+ */
+public class JobProgressTrackerClientNoOp implements JobProgressTrackerClient {
+  @Override
+  public void cleanup() {
+  }
+
+  @Override
+  public void mapperStarted() {
+  }
+
+  @Override
+  public void logInfo(String logLine) {
+  }
+
+  @Override
+  public void logFailure(String reason) {
+  }
+
+  @Override
+  public void updateProgress(WorkerProgress workerProgress) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
new file mode 100644
index 0000000..f15a2e7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.job.JobProgressTracker;
+import org.apache.giraph.worker.WorkerProgress;
+import org.apache.log4j.Logger;
+
+import com.facebook.nifty.client.FramedClientConnector;
+import com.facebook.swift.service.RuntimeTTransportException;
+import com.facebook.swift.service.ThriftClientManager;
+import com.google.common.io.Closeables;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Wrapper around JobProgressTracker which retires to connect and swallows
+ * exceptions so app wouldn't crash if something goes wrong with progress
+ * reports.
+ */
+public class RetryableJobProgressTrackerClient
+    implements JobProgressTrackerClient {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(RetryableJobProgressTrackerClient.class);
+  /** Configuration */
+  private final GiraphConfiguration conf;
+  /** Thrift client manager to use to connect to job progress tracker */
+  private ThriftClientManager clientManager;
+  /** Job progress tracker */
+  private JobProgressTracker jobProgressTracker;
+
+  /**
+   * Constructor
+   *
+   * @param conf Giraph configuration
+   */
+  public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
+      ExecutionException, InterruptedException {
+    this.conf = conf;
+    resetConnection();
+  }
+
+  /**
+   * Try to establish new connection to JobProgressTracker
+   */
+  private void resetConnection() throws ExecutionException,
+      InterruptedException {
+    clientManager = new ThriftClientManager();
+    FramedClientConnector connector =
+        new FramedClientConnector(new InetSocketAddress(
+            JOB_PROGRESS_SERVICE_HOST.get(conf),
+            JOB_PROGRESS_SERVICE_PORT.get(conf)));
+    jobProgressTracker =
+        clientManager.createClient(connector, JobProgressTracker.class).get();
+
+  }
+
+  @Override
+  public synchronized void cleanup() throws IOException {
+    Closeables.close(clientManager, true);
+    try {
+      clientManager.close();
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Exception occurred while trying to close JobProgressTracker", e);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void mapperStarted() {
+    executeWithRetry(new Runnable() {
+      @Override
+      public void run() {
+        jobProgressTracker.mapperStarted();
+      }
+    });
+  }
+
+  @Override
+  public synchronized void logInfo(final String logLine) {
+    executeWithRetry(new Runnable() {
+      @Override
+      public void run() {
+        jobProgressTracker.logInfo(logLine);
+      }
+    });
+  }
+
+  @Override
+  public synchronized void logFailure(final String reason) {
+    executeWithRetry(new Runnable() {
+      @Override
+      public void run() {
+        jobProgressTracker.logFailure(reason);
+      }
+    });
+  }
+
+  @Override
+  public synchronized void updateProgress(final WorkerProgress workerProgress) {
+    executeWithRetry(new Runnable() {
+      @Override
+      public void run() {
+        jobProgressTracker.updateProgress(workerProgress);
+      }
+    });
+  }
+
+  /**
+   * Execute Runnable, if disconnected try to connect again and retry
+   *
+   * @param runnable Runnable to execute
+   */
+  private void executeWithRetry(Runnable runnable) {
+    try {
+      runnable.run();
+    } catch (RuntimeTTransportException te) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RuntimeTTransportException occurred while talking to " +
+            "JobProgressTracker server, trying to reconnect", te);
+      }
+      try {
+        try {
+          clientManager.close();
+          // CHECKSTYLE: stop IllegalCatch
+        } catch (Exception e) {
+          // CHECKSTYLE: resume IllegalCatch
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("");
+          }
+        }
+        resetConnection();
+        runnable.run();
+        // CHECKSTYLE: stop IllegalCatch
+      } catch (Exception e) {
+        // CHECKSTYLE: resume IllegalCatch
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Exception occurred while talking to " +
+              "JobProgressTracker server, giving up", e);
+        }
+      }
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Exception occurred while talking to " +
+            "JobProgressTracker server, giving up", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 436126b..93aa679 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -239,6 +239,9 @@ public class GiraphJob {
     int tryCount = 0;
     GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
     while (true) {
+      JobProgressTrackerService jobProgressTrackerService =
+          JobProgressTrackerService.createJobProgressServer(conf);
+
       tryCount++;
       Job submittedJob = new Job(conf, jobName);
       if (submittedJob.getJar() == null) {
@@ -253,16 +256,17 @@ public class GiraphJob {
       jobObserver.launchingJob(submittedJob);
       submittedJob.submit();
       if (LOG.isInfoEnabled()) {
-        LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL());
+        LOG.info("Tracking URL: " + submittedJob.getTrackingURL());
+        LOG.info(
+            "Waiting for resources... Job will start only when it gets all " +
+                (conf.getMinWorkers() + 1) + " mappers");
       }
-      HaltApplicationUtils.printHaltInfo(submittedJob, conf);
-      JobProgressTracker jobProgressTracker = conf.trackJobProgressOnClient() ?
-          new JobProgressTracker(submittedJob, conf) : null;
       jobObserver.jobRunning(submittedJob);
+      HaltApplicationUtils.printHaltInfo(submittedJob, conf);
 
       boolean passed = submittedJob.waitForCompletion(verbose);
-      if (jobProgressTracker != null) {
-        jobProgressTracker.stop();
+      if (jobProgressTrackerService != null) {
+        jobProgressTrackerService.stop(passed);
       }
       jobObserver.jobFinished(submittedJob, passed);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
index 6971174..95bc56d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -18,138 +18,53 @@
 
 package org.apache.giraph.job;
 
-import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.utils.CounterUtils;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.conf.StrConfOption;
 import org.apache.giraph.worker.WorkerProgress;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import com.facebook.swift.service.ThriftMethod;
+import com.facebook.swift.service.ThriftService;
 
 /**
- * Class which tracks job's progress on client
+ * Interface for job progress tracker on job client
  */
-public class JobProgressTracker implements Watcher {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(JobProgressTracker.class);
-  /** How often to print job's progress */
-  private static final int UPDATE_MILLISECONDS = 5 * 1000;
-  /** Thread which periodically writes job's progress */
-  private Thread writerThread;
-  /** ZooKeeperExt */
-  private ZooKeeperExt zk;
-  /** Whether application is finished */
-  private volatile boolean finished = false;
+@ThriftService
+public interface JobProgressTracker {
+  /** Host on which job progress service runs */
+  StrConfOption JOB_PROGRESS_SERVICE_HOST =
+      new StrConfOption("giraph.jobProgressServiceHost", null,
+          "Host on which job progress service runs");
+  /** Port which job progress service uses */
+  IntConfOption JOB_PROGRESS_SERVICE_PORT =
+      new IntConfOption("giraph.jobProgressServicePort", -1,
+          "Port which job progress service uses");
+
+  /** Notify JobProgressTracker that mapper started */
+  @ThriftMethod
+  void mapperStarted();
 
   /**
-   * Constructor
+   * Call this when you want to log an info line from any mapper to command line
    *
-   * @param submittedJob Job to track
-   * @param conf Configuration
+   * @param logLine Line to log
    */
-  public JobProgressTracker(final Job submittedJob,
-      final GiraphConfiguration conf) throws IOException, InterruptedException {
-    String zkServer = CounterUtils.waitAndGetCounterNameFromGroup(
-        submittedJob, GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP);
-    final String basePath = CounterUtils.waitAndGetCounterNameFromGroup(
-        submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP);
-    // Connect to ZooKeeper
-    if (zkServer != null && basePath != null) {
-      zk = new ZooKeeperExt(
-        zkServer,
-        conf.getZooKeeperSessionTimeout(),
-        conf.getZookeeperOpsMaxAttempts(),
-        conf.getZookeeperOpsRetryWaitMsecs(),
-        this,
-        new Progressable() {
-          @Override
-          public void progress() {
-          }
-        });
-      writerThread = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          String workerProgressBasePath = basePath +
-            BspService.WORKER_PROGRESSES;
-          try {
-            while (!finished) {
-              if (zk.exists(workerProgressBasePath, false) != null) {
-                // Get locations of all worker progresses
-                List<String> workerProgressPaths = zk.getChildrenExt(
-                  workerProgressBasePath, false, false, true);
-                List<WorkerProgress> workerProgresses =
-                  new ArrayList<WorkerProgress>(workerProgressPaths.size());
-                // Read all worker progresses
-                for (String workerProgressPath : workerProgressPaths) {
-                  WorkerProgress workerProgress = new WorkerProgress();
-                  byte[] zkData = zk.getData(workerProgressPath, false, null);
-                  WritableUtils.readFieldsFromByteArray(zkData, workerProgress);
-                  workerProgresses.add(workerProgress);
-                }
-                // Combine and log
-                CombinedWorkerProgress combinedWorkerProgress =
-                  new CombinedWorkerProgress(workerProgresses);
-                if (LOG.isInfoEnabled()) {
-                  LOG.info(combinedWorkerProgress.toString());
-                }
-                // Check if application is done
-                if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
-                  break;
-                }
-              }
-              Thread.sleep(UPDATE_MILLISECONDS);
-            }
-            // CHECKSTYLE: stop IllegalCatchCheck
-          } catch (Exception e) {
-            // CHECKSTYLE: resume IllegalCatchCheck
-            if (LOG.isInfoEnabled()) {
-              LOG.info("run: Exception occurred", e);
-            }
-          } finally {
-            try {
-              // Create a node so master knows we stopped communicating with
-              // ZooKeeper and it's safe to cleanup
-              zk.createExt(
-                basePath + BspService.CLEANED_UP_DIR + "/client",
-                null,
-                ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT,
-                true);
-              zk.close();
-              // CHECKSTYLE: stop IllegalCatchCheck
-            } catch (Exception e) {
-              // CHECKSTYLE: resume IllegalCatchCheck
-              if (LOG.isInfoEnabled()) {
-                LOG.info("run: Exception occurred", e);
-              }
-            }
-          }
-        }
-      });
-      writerThread.start();
-    }
-  }
+  @ThriftMethod
+  void logInfo(String logLine);
 
   /**
-   * Stop the thread which logs application progress
+   * Notify that job is failing
+   *
+   * @param reason Reason for failure
    */
-  public void stop() {
-    finished = true;
-  }
+  @ThriftMethod
+  void logFailure(String reason);
 
-  @Override
-  public void process(WatchedEvent event) {
-  }
+  /**
+   * Workers should call this method to update their progress
+   *
+   * @param workerProgress Progress of the worker
+   */
+  @ThriftMethod
+  void updateProgress(WorkerProgress workerProgress);
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
new file mode 100644
index 0000000..3a896e2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.worker.WorkerProgress;
+import org.apache.log4j.Logger;
+
+import com.facebook.swift.codec.ThriftCodecManager;
+import com.facebook.swift.service.ThriftEventHandler;
+import com.facebook.swift.service.ThriftServer;
+import com.facebook.swift.service.ThriftServerConfig;
+import com.facebook.swift.service.ThriftServiceProcessor;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of job progress tracker service on job client
+ */
+public class JobProgressTrackerService implements JobProgressTracker {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(JobProgressTrackerService.class);
+  /** How often to print job's progress */
+  private static final int UPDATE_MILLISECONDS = 10 * 1000;
+
+  /** Configuration */
+  private final GiraphConfiguration conf;
+  /** Thread which periodically writes job's progress */
+  private Thread writerThread;
+  /** Whether application is finished */
+  private volatile boolean finished = false;
+  /** Server which uses this service */
+  private ThriftServer server;
+  /** Number of mappers which the job got */
+  private int mappersStarted;
+  /** Last time number of mappers started was logged */
+  private long lastTimeMappersStartedLogged;
+  /** Map of worker progresses */
+  private final Map<Integer, WorkerProgress> workerProgresses =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   */
+  public JobProgressTrackerService(GiraphConfiguration conf) {
+    this.conf = conf;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Waiting for job to start... (this may take a minute)");
+    }
+    startWriterThread();
+  }
+
+  /**
+   * Start the thread which writes progress periodically
+   */
+  private void startWriterThread() {
+    writerThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!finished) {
+          if (mappersStarted == conf.getMaxWorkers() + 1 &&
+              !workerProgresses.isEmpty()) {
+            // Combine and log
+            CombinedWorkerProgress combinedWorkerProgress =
+                new CombinedWorkerProgress(workerProgresses.values());
+            if (LOG.isInfoEnabled()) {
+              LOG.info(combinedWorkerProgress.toString());
+            }
+            // Check if application is done
+            if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
+              break;
+            }
+          }
+          try {
+            Thread.sleep(UPDATE_MILLISECONDS);
+          } catch (InterruptedException e) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("Progress thread interrupted");
+            }
+            break;
+          }
+        }
+      }
+    });
+    writerThread.start();
+  }
+
+  @Override
+  public synchronized void mapperStarted() {
+    mappersStarted++;
+    if (LOG.isInfoEnabled()) {
+      if (mappersStarted == conf.getMaxWorkers() + 1) {
+        LOG.info("Got all " + mappersStarted + " mappers");
+      } else {
+        if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
+            UPDATE_MILLISECONDS) {
+          lastTimeMappersStartedLogged = System.currentTimeMillis();
+          LOG.info("Got " + mappersStarted + " but needs " +
+              (conf.getMaxWorkers() + 1) + " mappers");
+        }
+      }
+    }
+  }
+
+  @Override
+  public void logInfo(String logLine) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info(logLine);
+    }
+  }
+
+  @Override
+  public void logFailure(String reason) {
+    LOG.fatal(reason);
+    finished = true;
+    writerThread.interrupt();
+  }
+
+  @Override
+  public void updateProgress(WorkerProgress workerProgress) {
+    workerProgresses.put(workerProgress.getTaskId(), workerProgress);
+  }
+
+  /**
+   * Stop the thread which logs application progress and server
+   *
+   * @param succeeded Whether job succeeded or not
+   */
+  public void stop(boolean succeeded) {
+    finished = true;
+    writerThread.interrupt();
+    server.close();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
+          ", cleaning up...");
+    }
+  }
+
+  /**
+   * Create job progress server on job client, and update configuration with
+   * its hostname and port so mappers would know what to connect to. Returns
+   * null if progress shouldn't be tracked
+   *
+   * @param conf Configuration
+   * @return JobProgressTrackerService
+   */
+  public static JobProgressTrackerService createJobProgressServer(
+      GiraphConfiguration conf) {
+    if (!conf.trackJobProgressOnClient()) {
+      return null;
+    }
+    try {
+      JobProgressTrackerService service = new JobProgressTrackerService(conf);
+      ThriftServiceProcessor processor =
+          new ThriftServiceProcessor(new ThriftCodecManager(),
+              new ArrayList<ThriftEventHandler>(), service);
+      service.server = new ThriftServer(processor, new ThriftServerConfig());
+      service.server.start();
+      JOB_PROGRESS_SERVICE_HOST.set(conf,
+          InetAddress.getLocalHost().getHostName());
+      JOB_PROGRESS_SERVICE_PORT.set(conf, service.server.getPort());
+      return service;
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      LOG.warn("Exception occurred while trying to create " +
+          "JobProgressTrackerService - not using progress reporting", e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 671df23..efa5b87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -307,6 +307,7 @@ public class BspServiceMaster<I extends WritableComparable,
    * @param reason The reason the job failed
    */
   private void setJobStateFailed(String reason) {
+    getGraphTaskManager().getJobProgressTracker().logFailure(reason);
     setJobState(ApplicationState.FAILED, -1, -1, false);
     failJob(new IllegalStateException(reason));
   }
@@ -644,7 +645,8 @@ public class BspServiceMaster<I extends WritableComparable,
           "check input of " + inputFormat.getClass().getName() + "!");
       getContext().setStatus("Failing job due to 0 input splits, " +
           "check input of " + inputFormat.getClass().getName() + "!");
-      setJobStateFailed("0 input splits");
+      setJobStateFailed("Please check your input tables - partitions which " +
+          "you specified are missing. Failing the job!!!");
     }
     if (minSplitCountHint > splitList.size()) {
       LOG.warn(logPrefix + ": Number of inputSplits=" +
@@ -885,7 +887,7 @@ public class BspServiceMaster<I extends WritableComparable,
               getContext());
           aggregatorHandler.initialize(this);
           masterCompute = getConfiguration().createMasterCompute();
-          masterCompute.setMasterAggregatorUsage(aggregatorHandler);
+          masterCompute.setMasterService(this);
 
           masterInfo = new MasterInfo();
           masterServer =
@@ -1790,10 +1792,6 @@ public class BspServiceMaster<I extends WritableComparable,
         GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
       maxTasks *= 2;
     }
-    if (getConfiguration().trackJobProgressOnClient()) {
-      // For job client
-      maxTasks++;
-    }
     List<String> cleanedUpChildrenList = null;
     while (true) {
       try {

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index c2a1f9a..552cca9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.master;
 
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.Computation;
@@ -46,7 +47,7 @@ public abstract class MasterCompute
   /** If true, do not do anymore computation on this vertex. */
   private boolean halt = false;
   /** Master aggregator usage */
-  private MasterAggregatorUsage masterAggregatorUsage;
+  private CentralizedServiceMaster serviceMaster;
   /** Graph state */
   private GraphState graphState;
   /**
@@ -192,14 +193,16 @@ public abstract class MasterCompute
   public final <A extends Writable> boolean registerAggregator(
     String name, Class<? extends Aggregator<A>> aggregatorClass)
     throws InstantiationException, IllegalAccessException {
-    return masterAggregatorUsage.registerAggregator(name, aggregatorClass);
+    return serviceMaster.getAggregatorHandler().registerAggregator(
+        name, aggregatorClass);
   }
 
   @Override
   public final <A extends Writable> boolean registerAggregator(
     String name, WritableFactory<? extends Aggregator<A>> aggregator)
     throws InstantiationException, IllegalAccessException {
-    return masterAggregatorUsage.registerAggregator(name, aggregator);
+    return serviceMaster.getAggregatorHandler().registerAggregator(
+        name, aggregator);
   }
 
   @Override
@@ -207,28 +210,37 @@ public abstract class MasterCompute
       String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
-    return masterAggregatorUsage.registerPersistentAggregator(
+    return serviceMaster.getAggregatorHandler().registerPersistentAggregator(
         name, aggregatorClass);
   }
 
   @Override
   public final <A extends Writable> A getAggregatedValue(String name) {
-    return masterAggregatorUsage.<A>getAggregatedValue(name);
+    return serviceMaster.getAggregatorHandler().<A>getAggregatedValue(name);
   }
 
   @Override
   public final <A extends Writable> void setAggregatedValue(
       String name, A value) {
-    masterAggregatorUsage.setAggregatedValue(name, value);
+    serviceMaster.getAggregatorHandler().setAggregatedValue(name, value);
+  }
+
+  /**
+   * Call this to log a line to command line of the job. Use in moderation -
+   * it's a synchronous call to Job client
+   *
+   * @param line Line to print
+   */
+  public void logToCommandLine(String line) {
+    serviceMaster.getJobProgressTracker().logInfo(line);
   }
 
   final void setGraphState(GraphState graphState) {
     this.graphState = graphState;
   }
 
-  final void setMasterAggregatorUsage(MasterAggregatorUsage
-      masterAggregatorUsage) {
-    this.masterAggregatorUsage = masterAggregatorUsage;
+  final void setMasterService(CentralizedServiceMaster serviceMaster) {
+    this.serviceMaster = serviceMaster;
   }
 
   final void setSuperstepClasses(SuperstepClasses superstepClasses) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
index 2b30739..f78b1a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
@@ -150,7 +150,7 @@ public class ScriptLoader {
    *
    * @param conf Configuration
    */
-  public static void loadScripts(Configuration conf) {
+  public static void loadScripts(Configuration conf) throws IOException {
     List<DeployedScript> deployedScripts = getScriptsToLoad(conf);
     if (deployedScripts == null) {
       return;
@@ -167,7 +167,7 @@ public class ScriptLoader {
    * @param deployedScript the deployed script
    */
   public static void loadScript(Configuration conf,
-      DeployedScript deployedScript) {
+      DeployedScript deployedScript) throws IOException {
     InputStream stream = openScriptInputStream(conf, deployedScript);
     switch (deployedScript.getLanguage()) {
     case JYTHON:
@@ -180,7 +180,7 @@ public class ScriptLoader {
     }
 
     LOADED_SCRIPTS.add(deployedScript);
-    Closeables.closeQuietly(stream);
+    Closeables.close(stream, true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
index 6e8b1e3..0f9a08a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
@@ -129,7 +129,7 @@ public class FileUtils {
         writer.write('\n');
       }
     } finally {
-      Closeables.closeQuietly(writer);
+      Closeables.close(writer, true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 447bb6f..120678f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -229,7 +229,8 @@ public class BspServiceWorker<I extends WritableComparable,
 
     WorkerProgress.get().setTaskId(getTaskPartition());
     workerProgressWriter = conf.trackJobProgressOnClient() ?
-        new WorkerProgressWriter(myProgressPath, getZkExt()) : null;
+        new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
+        null;
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index aca9944..7a55d56 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -206,6 +206,16 @@ public abstract class WorkerContext
     return workerAggregatorUsage.<A>getAggregatedValue(name);
   }
 
+  /**
+   * Call this to log a line to command line of the job. Use in moderation -
+   * it's a synchronous call to Job client
+   *
+   * @param line Line to print
+   */
+  public void logToCommandLine(String line) {
+    serviceWorker.getJobProgressTracker().logInfo(line);
+  }
+
   @Override
   public void write(DataOutput dataOutput) throws IOException {
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
index 1a2a6ee..24f791b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
@@ -19,17 +19,9 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
 
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -38,9 +30,8 @@ import javax.annotation.concurrent.ThreadSafe;
  * ZooKeeper with {@link WorkerProgressWriter}.
  */
 @ThreadSafe
-public class WorkerProgress implements Writable {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(WorkerProgress.class);
+@ThriftStruct
+public class WorkerProgress {
   /** Singleton instance for everyone to use */
   private static final WorkerProgress INSTANCE = new WorkerProgress();
 
@@ -99,45 +90,6 @@ public class WorkerProgress implements Writable {
   }
 
   /**
-   * Write worker's progress to znode
-   *
-   * @param zk ZooKeeperExt
-   * @param myProgressPath Path to write the progress to
-   */
-  public static void writeToZnode(ZooKeeperExt zk, String myProgressPath) {
-    byte[] byteArray = WritableUtils.writeToByteArray(get());
-    try {
-      zk.createOrSetExt(myProgressPath,
-          byteArray,
-          ZooDefs.Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true,
-          -1);
-    } catch (KeeperException | InterruptedException e) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("writeToZnode: " + e.getClass().getName() +
-            " exception occurred", e);
-      }
-    }
-  }
-
-  public synchronized boolean isLoadingVerticesDone() {
-    return loadingVerticesDone;
-  }
-
-  public synchronized boolean isLoadingEdgesDone() {
-    return loadingEdgesDone;
-  }
-
-  public synchronized boolean isComputationDone() {
-    return computationDone;
-  }
-
-  public synchronized boolean isStoringDone() {
-    return storingDone;
-  }
-
-  /**
    * Add number of vertices loaded
    *
    * @param verticesLoaded How many vertices were loaded since the last
@@ -188,8 +140,8 @@ public class WorkerProgress implements Writable {
   /**
    * Notify this class that next computation superstep is starting
    *
-   * @param superstep Superstep which is starting
-   * @param verticesToCompute How many vertices are there to compute
+   * @param superstep           Superstep which is starting
+   * @param verticesToCompute   How many vertices are there to compute
    * @param partitionsToCompute How many partitions are there to compute
    */
   public synchronized void startSuperstep(long superstep,
@@ -221,7 +173,7 @@ public class WorkerProgress implements Writable {
   /**
    * Notify this class that worker is starting to store data
    *
-   * @param verticesToStore How many vertices should be stored
+   * @param verticesToStore   How many vertices should be stored
    * @param partitionsToStore How many partitions should be stored
    */
   public synchronized void startStoring(long verticesToStore,
@@ -260,10 +212,6 @@ public class WorkerProgress implements Writable {
     storingDone = true;
   }
 
-  public synchronized void setTaskId(int taskId) {
-    this.taskId = taskId;
-  }
-
   /**
    * Update memory info
    */
@@ -271,58 +219,101 @@ public class WorkerProgress implements Writable {
     freeMemoryMB = MemoryUtils.freeMemoryMB();
   }
 
+  @ThriftField(1)
   public synchronized long getCurrentSuperstep() {
     return currentSuperstep;
   }
 
+  @ThriftField(2)
   public synchronized long getVerticesLoaded() {
     return verticesLoaded;
   }
 
+  @ThriftField(3)
   public synchronized int getVertexInputSplitsLoaded() {
     return vertexInputSplitsLoaded;
   }
 
+  @ThriftField(4)
+  public synchronized boolean isLoadingVerticesDone() {
+    return loadingVerticesDone;
+  }
+
+  @ThriftField(5)
   public synchronized long getEdgesLoaded() {
     return edgesLoaded;
   }
 
+  @ThriftField(6)
   public synchronized int getEdgeInputSplitsLoaded() {
     return edgeInputSplitsLoaded;
   }
 
+  @ThriftField(7)
+  public synchronized boolean isLoadingEdgesDone() {
+    return loadingEdgesDone;
+  }
+
+  @ThriftField(8)
   public synchronized long getVerticesToCompute() {
     return verticesToCompute;
   }
 
+  @ThriftField(9)
   public synchronized long getVerticesComputed() {
     return verticesComputed;
   }
 
+  @ThriftField(10)
   public synchronized int getPartitionsToCompute() {
     return partitionsToCompute;
   }
 
+  @ThriftField(11)
   public synchronized int getPartitionsComputed() {
     return partitionsComputed;
   }
 
+  @ThriftField(12)
+  public synchronized boolean isComputationDone() {
+    return computationDone;
+  }
+
+  @ThriftField(13)
   public synchronized long getVerticesToStore() {
     return verticesToStore;
   }
 
+  @ThriftField(14)
   public synchronized long getVerticesStored() {
     return verticesStored;
   }
 
+  @ThriftField(15)
   public synchronized int getPartitionsToStore() {
     return partitionsToStore;
   }
 
+  @ThriftField(16)
   public synchronized int getPartitionsStored() {
     return partitionsStored;
   }
 
+  @ThriftField(17)
+  public synchronized boolean isStoringDone() {
+    return storingDone;
+  }
+
+  @ThriftField(18)
+  public synchronized int getTaskId() {
+    return taskId;
+  }
+
+  @ThriftField(19)
+  public synchronized double getFreeMemoryMB() {
+    return freeMemoryMB;
+  }
+
   public synchronized boolean isInputSuperstep() {
     return currentSuperstep == -1;
   }
@@ -335,69 +326,98 @@ public class WorkerProgress implements Writable {
     return currentSuperstep == Long.MAX_VALUE;
   }
 
-  public synchronized int getTaskId() {
-    return taskId;
+  @ThriftField
+  public void setCurrentSuperstep(long currentSuperstep) {
+    this.currentSuperstep = currentSuperstep;
   }
 
-  public synchronized double getFreeMemoryMB() {
-    return freeMemoryMB;
+  @ThriftField
+  public void setVerticesLoaded(long verticesLoaded) {
+    this.verticesLoaded = verticesLoaded;
   }
 
-  @Override
-  public synchronized void write(DataOutput dataOutput) throws IOException {
-    dataOutput.writeLong(currentSuperstep);
+  @ThriftField
+  public void setVertexInputSplitsLoaded(int vertexInputSplitsLoaded) {
+    this.vertexInputSplitsLoaded = vertexInputSplitsLoaded;
+  }
 
-    dataOutput.writeLong(verticesLoaded);
-    dataOutput.writeInt(vertexInputSplitsLoaded);
-    dataOutput.writeBoolean(loadingVerticesDone);
-    dataOutput.writeLong(edgesLoaded);
-    dataOutput.writeInt(edgeInputSplitsLoaded);
-    dataOutput.writeBoolean(loadingEdgesDone);
+  @ThriftField
+  public void setLoadingVerticesDone(boolean loadingVerticesDone) {
+    this.loadingVerticesDone = loadingVerticesDone;
+  }
 
-    dataOutput.writeLong(verticesToCompute);
-    dataOutput.writeLong(verticesComputed);
-    dataOutput.writeInt(partitionsToCompute);
-    dataOutput.writeInt(partitionsComputed);
+  @ThriftField
+  public void setEdgesLoaded(long edgesLoaded) {
+    this.edgesLoaded = edgesLoaded;
+  }
 
-    dataOutput.writeBoolean(computationDone);
+  @ThriftField
+  public void setEdgeInputSplitsLoaded(int edgeInputSplitsLoaded) {
+    this.edgeInputSplitsLoaded = edgeInputSplitsLoaded;
+  }
 
-    dataOutput.writeLong(verticesToStore);
-    dataOutput.writeLong(verticesStored);
-    dataOutput.writeInt(partitionsToStore);
-    dataOutput.writeInt(partitionsStored);
-    dataOutput.writeBoolean(storingDone);
+  @ThriftField
+  public void setLoadingEdgesDone(boolean loadingEdgesDone) {
+    this.loadingEdgesDone = loadingEdgesDone;
+  }
 
-    dataOutput.writeInt(taskId);
+  @ThriftField
+  public void setVerticesToCompute(long verticesToCompute) {
+    this.verticesToCompute = verticesToCompute;
+  }
 
-    dataOutput.writeDouble(freeMemoryMB);
+  @ThriftField
+  public void setVerticesComputed(long verticesComputed) {
+    this.verticesComputed = verticesComputed;
   }
 
-  @Override
-  public synchronized void readFields(DataInput dataInput) throws IOException {
-    currentSuperstep = dataInput.readLong();
+  @ThriftField
+  public void setPartitionsToCompute(int partitionsToCompute) {
+    this.partitionsToCompute = partitionsToCompute;
+  }
 
-    verticesLoaded = dataInput.readLong();
-    vertexInputSplitsLoaded = dataInput.readInt();
-    loadingVerticesDone = dataInput.readBoolean();
-    edgesLoaded = dataInput.readLong();
-    edgeInputSplitsLoaded = dataInput.readInt();
-    loadingEdgesDone = dataInput.readBoolean();
+  @ThriftField
+  public void setPartitionsComputed(int partitionsComputed) {
+    this.partitionsComputed = partitionsComputed;
+  }
 
-    verticesToCompute = dataInput.readLong();
-    verticesComputed = dataInput.readLong();
-    partitionsToCompute = dataInput.readInt();
-    partitionsComputed = dataInput.readInt();
+  @ThriftField
+  public void setComputationDone(boolean computationDone) {
+    this.computationDone = computationDone;
+  }
 
-    computationDone = dataInput.readBoolean();
+  @ThriftField
+  public void setVerticesToStore(long verticesToStore) {
+    this.verticesToStore = verticesToStore;
+  }
 
-    verticesToStore = dataInput.readLong();
-    verticesStored = dataInput.readLong();
-    partitionsToStore = dataInput.readInt();
-    partitionsStored = dataInput.readInt();
-    storingDone = dataInput.readBoolean();
+  @ThriftField
+  public void setVerticesStored(long verticesStored) {
+    this.verticesStored = verticesStored;
+  }
 
-    taskId = dataInput.readInt();
+  @ThriftField
+  public void setPartitionsToStore(int partitionsToStore) {
+    this.partitionsToStore = partitionsToStore;
+  }
+
+  @ThriftField
+  public void setPartitionsStored(int partitionsStored) {
+    this.partitionsStored = partitionsStored;
+  }
+
+  @ThriftField
+  public void setStoringDone(boolean storingDone) {
+    this.storingDone = storingDone;
+  }
 
-    freeMemoryMB = dataInput.readDouble();
+  @ThriftField
+  public void setFreeMemoryMB(double freeMemoryMB) {
+    this.freeMemoryMB = freeMemoryMB;
+  }
+
+  @ThriftField
+  public synchronized void setTaskId(int taskId) {
+    this.taskId = taskId;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
index 4ff5bb1..dae9963 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.worker;
 
-import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.job.JobProgressTracker;
 import org.apache.log4j.Logger;
 
 /**
@@ -31,33 +31,27 @@ public class WorkerProgressWriter {
   /** How often to update worker's progress */
   private static final int WRITE_UPDATE_PERIOD_MILLISECONDS = 10 * 1000;
 
+  /** Job progress tracker */
+  private final JobProgressTracker jobProgressTracker;
   /** Thread which writes worker's progress */
   private final Thread writerThread;
   /** Whether worker finished application */
   private volatile boolean finished = false;
-  /** Path where this worker's progress should be stored */
-  private final String myProgressPath;
-  /** ZooKeeperExt */
-  private final ZooKeeperExt zk;
 
   /**
    * Constructor, starts separate thread to periodically update worker's
    * progress
    *
-   * @param myProgressPath Path where this worker's progress should be stored
-   * @param zk ZooKeeperExt
+   * @param jobProgressTracker JobProgressTracker to report job progress to
    */
-  public WorkerProgressWriter(String myProgressPath, ZooKeeperExt zk) {
-    this.myProgressPath = myProgressPath;
-    this.zk = zk;
+  public WorkerProgressWriter(JobProgressTracker jobProgressTracker) {
+    this.jobProgressTracker = jobProgressTracker;
     writerThread = new Thread(new Runnable() {
       @Override
       public void run() {
         try {
           while (!finished) {
-            WorkerProgress.get().updateMemory();
-            WorkerProgress.writeToZnode(WorkerProgressWriter.this.zk,
-                WorkerProgressWriter.this.myProgressPath);
+            updateAndSendProgress();
             double factor = 1 + Math.random();
             Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor));
           }
@@ -73,12 +67,20 @@ public class WorkerProgressWriter {
   }
 
   /**
+   * Update worker progress and send it
+   */
+  private void updateAndSendProgress() {
+    WorkerProgress.get().updateMemory();
+    jobProgressTracker.updateProgress(WorkerProgress.get());
+  }
+
+  /**
    * Stop the thread which writes worker's progress
    */
   public void stop() throws InterruptedException {
     finished = true;
     writerThread.interrupt();
     writerThread.join();
-    WorkerProgress.writeToZnode(zk, myProgressPath);
+    updateAndSendProgress();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index 73ef97b..b5816d7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -619,13 +619,13 @@ public class ZooKeeperManager {
                 myidWriter = new FileWriter(zkDir + "/myid");
                 myidWriter.write(i + "\n");
               } finally {
-                Closeables.closeQuietly(myidWriter);
+                Closeables.close(myidWriter, true);
               }
             }
           }
         }
       } finally {
-        Closeables.closeQuietly(writer);
+        Closeables.close(writer, true);
       }
     } catch (IOException e) {
       throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index dd2369a..b372ab7 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -268,8 +268,8 @@ public class BspCase implements Watcher {
           numResults++;
         }
       } finally {
-        Closeables.closeQuietly(in);
-        Closeables.closeQuietly(reader);
+        Closeables.close(in, true);
+        Closeables.close(reader, true);
       }
     }
     return numResults;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-examples/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml
index f8304a1..f95edcb 100644
--- a/giraph-examples/pom.xml
+++ b/giraph-examples/pom.xml
@@ -226,14 +226,6 @@ under the License.
           </plugin>
         </plugins>
       </build>
-      <dependencies>
-        <dependency>
-          <groupId>org.jboss.netty</groupId>
-          <artifactId>netty</artifactId>
-          <version>${dep.oldnetty.version}</version>
-          <scope>test</scope>
-        </dependency>
-      </dependencies>
     </profile>
 
     <profile>

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 5612e5f..488e1ea 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -461,8 +461,8 @@ public class
           assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep));
 
         } finally {
-          Closeables.closeQuietly(in);
-          Closeables.closeQuietly(reader);
+          Closeables.close(in, true);
+          Closeables.close(reader, true);
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
index 7ae8bc3..517901a 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
@@ -176,7 +176,7 @@ public class HiveJythonUtils {
       jythonJob = parseJythonStreams(interpreter, streams);
     } finally {
       for (InputStream stream : streams) {
-        Closeables.closeQuietly(stream);
+        Closeables.close(stream, true);
       }
     }
     return jythonJob;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b4d78ae..2e3eb63 100644
--- a/pom.xml
+++ b/pom.xml
@@ -289,6 +289,7 @@ under the License.
     <dep.commons-logging.version>1.1.1</dep.commons-logging.version>
     <dep.commons-io.version>2.1</dep.commons-io.version>
     <dep.commons-net.version>3.1</dep.commons-net.version>
+    <dep.facebook-swift.version>0.13.1</dep.facebook-swift.version>
     <dep.fasterxml-jackson.version>2.1.2</dep.fasterxml-jackson.version>
     <dep.fastutil.version>6.5.4</dep.fastutil.version>
     <dep.google.findbugs.version>2.0.2</dep.google.findbugs.version>
@@ -309,7 +310,7 @@ under the License.
     <!-- note: old version of netty is required by hadoop_facebook for tests to succeed -->
     <dep.oldnetty.version>3.2.2.Final</dep.oldnetty.version>
     <dep.netty.version>4.0.14.Final</dep.netty.version>
-    <dep.paranamer.version>2.3</dep.paranamer.version>
+    <dep.paranamer.version>2.5.2</dep.paranamer.version>
     <dep.slf4j.version>1.7.5</dep.slf4j.version>
     <dep.tinkerpop.rexter.version>2.4.0</dep.tinkerpop.rexter.version>
     <dep.typetools.version>0.2.1</dep.typetools.version>
@@ -1389,6 +1390,14 @@ under the License.
             <groupId>commons-beanutils</groupId>
             <artifactId>commons-beanutils-core</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>io.airlift</groupId>
+            <artifactId>units</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
       <dependency>
@@ -1404,6 +1413,14 @@ under the License.
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>io.airlift</groupId>
+            <artifactId>units</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
       <dependency>
@@ -1594,6 +1611,60 @@ under the License.
         <version>${dep.yourkit-api.version}</version>
       </dependency>
       <dependency>
+        <groupId>com.facebook.nifty</groupId>
+        <artifactId>nifty-client</artifactId>
+        <version>${dep.facebook-swift.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>com.facebook.swift</groupId>
+        <artifactId>swift-codec</artifactId>
+        <version>${dep.facebook-swift.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm-all</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>com.facebook.swift</groupId>
+        <artifactId>swift-annotations</artifactId>
+        <version>${dep.facebook-swift.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.facebook.swift</groupId>
+        <artifactId>swift-service</artifactId>
+        <version>${dep.facebook-swift.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils-core</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
         <groupId>log4j</groupId>
         <artifactId>log4j</artifactId>
         <version>${dep.log4j.version}</version>
@@ -2086,6 +2157,14 @@ under the License.
             <groupId>commons-beanutils</groupId>
             <artifactId>commons-beanutils-core</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>io.airlift</groupId>
+            <artifactId>units</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
       <dependency>