You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/03/04 18:08:36 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-677] Allow early termination of Gobblin jobs based on a predicate on the job progress

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e95bd3  [GOBBLIN-677] Allow early termination of Gobblin jobs based on a predicate on the job progress
9e95bd3 is described below

commit 9e95bd32d6115175b532dbbd9f7e73bae8059c02
Author: ibuenros <is...@gmail.com>
AuthorDate: Mon Mar 4 10:08:31 2019 -0800

    [GOBBLIN-677] Allow early termination of Gobblin jobs based on a predicate on the job progress
    
    Closes #2548 from ibuenros/early-termination-2
---
 .../gobblin/runtime/JobShutdownException.java      |  27 ++
 .../apache/gobblin/source/extractor/Extractor.java |  31 +-
 .../runtime/embedded/EmbeddedGobblinDistcp.java    |   2 +-
 .../embedded/EmbeddedGobblinDistcpTest.java        |   1 +
 .../extractor/extract/kafka/KafkaExtractor.java    |  14 +
 gobblin-runtime/build.gradle                       |   5 +-
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   |  42 +-
 .../java/org/apache/gobblin/runtime/JobState.java  |  23 +-
 .../main/java/org/apache/gobblin/runtime/Task.java |   8 +-
 .../java/org/apache/gobblin/runtime/TaskState.java |   3 +-
 .../gobblin/runtime/embedded/EmbeddedGobblin.java  |   7 +-
 .../runtime/job/GobblinJobFiniteStateMachine.java  | 180 ++++++++
 .../runtime/job/JobInterruptionPredicate.java      | 111 +++++
 .../apache/gobblin/runtime/job/JobProgress.java    |  38 ++
 .../apache/gobblin/runtime/job/TaskProgress.java   |  33 ++
 .../gobblin/runtime/local/LocalJobLauncher.java    |   5 +
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 126 ++++--
 .../spec_catalog/SpecCatalogListenersList.java     |  34 --
 .../gobblin/util/ReflectivePredicateEvaluator.java | 370 ++++++++++++++++
 .../runtime/job/JobInterruptionPredicateTest.java  | 133 ++++++
 .../util/ReflectivePredicateEvaluatorTest.java     | 125 ++++++
 .../org/apache/gobblin/fsm/FiniteStateMachine.java | 463 +++++++++++++++++++++
 .../org/apache/gobblin/fsm/StateWithCallbacks.java |  45 ++
 .../apache/gobblin/fsm/FiniteStateMachineTest.java | 344 +++++++++++++++
 gradle/scripts/defaultBuildProperties.gradle       |   2 +-
 gradle/scripts/dependencyDefinitions.gradle        |   4 +-
 gradle/scripts/globalDependencies.gradle           |   1 +
 27 files changed, 2095 insertions(+), 82 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
new file mode 100644
index 0000000..2c7d55a
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+/**
+ * An exception thrown when a job cannot be graciously shutdown.
+ */
+public class JobShutdownException extends Exception {
+	public JobShutdownException(String message) {
+		super(message);
+	}
+}
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
index 9749795..2ea900b 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
@@ -23,8 +23,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
+import org.apache.gobblin.runtime.JobShutdownException;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.stream.StreamEntity;
+import org.apache.gobblin.util.Decorator;
 
 import edu.umd.cs.findbugs.annotations.SuppressWarnings;
 import io.reactivex.Emitter;
@@ -54,7 +56,7 @@ public interface Extractor<S, D> extends Closeable {
    * @return schema of the extracted data records
    * @throws java.io.IOException if there is problem getting the schema
    */
-  public S getSchema() throws IOException;
+  S getSchema() throws IOException;
 
   /**
    * Read the next data record from the data source.
@@ -78,7 +80,7 @@ public interface Extractor<S, D> extends Closeable {
    *
    * @return the expected source record count
    */
-  public long getExpectedRecordCount();
+  long getExpectedRecordCount();
 
   /**
    * Get the calculated high watermark up to which data records are to be extracted.
@@ -88,7 +90,23 @@ public interface Extractor<S, D> extends Closeable {
    * <a href="https://github.com/linkedin/gobblin/wiki/Watermarks">Watermarks</a> for more information.
    */
   @Deprecated
-  public long getHighWatermark();
+  long getHighWatermark();
+
+  /**
+   * Called to notify the Extractor it should shut down as soon as possible. If this call returns successfully, the task
+   * will continue consuming records from the Extractor and continue execution normally. The extractor should only emit
+   * those records necessary to stop at a graceful committable state. Most job executors will eventually kill the task
+   * if the Extractor does not stop emitting records after a few seconds.
+   *
+   * @throws JobShutdownException if the extractor does not support early termination. This will cause the task to fail.
+   */
+  default void shutdown() throws JobShutdownException {
+    if (this instanceof Decorator && ((Decorator) this).getDecoratedObject() instanceof Extractor) {
+      ((Extractor) ((Decorator) this).getDecoratedObject()).shutdown();
+    } else {
+      throw new JobShutdownException(this.getClass().getName() + ": Extractor does not support shutdown.");
+    }
+  }
 
   /**
    * Read an {@link RecordEnvelope}. By default, just wrap {@link #readRecord(Object)} in a {@link RecordEnvelope}.
@@ -115,7 +133,12 @@ public interface Extractor<S, D> extends Closeable {
     S schema = getSchema();
     Flowable<StreamEntity<D>> recordStream = Flowable.generate(() -> shutdownRequest, (BiConsumer<AtomicBoolean, Emitter<StreamEntity<D>>>) (state, emitter) -> {
       if (state.get()) {
-        emitter.onComplete();
+        // shutdown requested
+        try {
+          shutdown();
+        } catch (JobShutdownException exc) {
+          emitter.onError(exc);
+        }
       }
       try {
         StreamEntity<D> record = readStreamEntity();
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
index e52748f..f7076bd 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
@@ -82,7 +82,7 @@ public class EmbeddedGobblinDistcp extends EmbeddedGobblin {
     this.setConfiguration(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, to.getFileSystem(new Configuration()).getUri().toString());
 
     // add gobblin-data-management jar to distributed jars
-    this.distributeJar(ClassUtil.findContainingJar(CopySource.class));
+    this.distributeJarByClassWithPriority(CopySource.class, 0);
   }
 
   /**
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 3b289c0..c12dea4 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index e54a4b6..0a4685f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -23,7 +23,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.gobblin.runtime.JobShutdownException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,6 +114,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   private long currentPartitionReadRecordTime = 0;
   protected D currentPartitionLastSuccessfulRecord = null;
 
+  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
   public KafkaExtractor(WorkUnitState state) {
     super(state);
     this.workUnitState = state;
@@ -164,6 +168,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   @SuppressWarnings("unchecked")
   @Override
   public D readRecordImpl(D reuse) throws DataRecordException, IOException {
+    if (this.shutdownRequested.get()) {
+      return null;
+    }
+
     long readStartTime = System.nanoTime();
 
     while (!allPartitionsFinished()) {
@@ -245,6 +253,12 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     return null;
   }
 
+  @Override
+  public void shutdown()
+      throws JobShutdownException {
+    this.shutdownRequested.set(true);
+  }
+
   private boolean allPartitionsFinished() {
     return this.currentPartitionIdx != INITIAL_PARTITION_IDX && this.currentPartitionIdx >= this.highWatermark.size();
   }
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 88e0c10..71dd510 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -49,6 +49,8 @@ dependencies {
 
   compile externalDependency.avro
   compile externalDependency.avroMapredH2
+  compile externalDependency.calciteCore
+  //compile externalDependency.calciteAvatica
   compile externalDependency.commonsCli
   compile externalDependency.commonsConfiguration
   compile externalDependency.commonsEmail
@@ -83,8 +85,6 @@ dependencies {
   compile externalDependency.kryo
 
   testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
-  testCompile externalDependency.calciteCore
-  testCompile externalDependency.calciteAvatica
   testCompile externalDependency.jhyde
   testCompile externalDependency.testng
   testCompile externalDependency.hamcrest
@@ -150,6 +150,7 @@ jmh {
 }
 
 test {
+    systemProperty "org.jboss.byteman.verbose", "true"
     workingDir rootProject.rootDir
     maxParallelForks = 1
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index e800107..ccc94cb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -28,8 +28,8 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +60,7 @@ import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
 import javax.annotation.Nullable;
+import lombok.Setter;
 
 
 /**
@@ -93,6 +94,8 @@ public class GobblinMultiTaskAttempt {
   private final Optional<String> containerIdOptional;
   private final Optional<StateStore<TaskState>> taskStateStoreOptional;
   private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
+  @Setter
+  private Predicate<GobblinMultiTaskAttempt> interruptionPredicate = (gmta) -> false;
   private List<Task> tasks;
 
   /**
@@ -139,16 +142,39 @@ public class GobblinMultiTaskAttempt {
     this.tasks = runWorkUnits(countDownLatch);
     log.info("Waiting for submitted tasks of job {} to complete in container {}...", jobId,
         containerIdOptional.or(""));
-    while (countDownLatch.getCount() > 0) {
-      log.info(String.format("%d out of %d tasks of job %s are running in container %s", countDownLatch.getCount(),
-          countDownLatch.getRegisteredParties(), jobId, containerIdOptional.or("")));
-      if (countDownLatch.await(10, TimeUnit.SECONDS)) {
-        break;
+    try {
+      while (countDownLatch.getCount() > 0) {
+        if (this.interruptionPredicate.test(this)) {
+          log.info("Interrupting task execution due to satisfied predicate.");
+          interruptTaskExecution(countDownLatch);
+          break;
+        }
+        log.info(String.format("%d out of %d tasks of job %s are running in container %s", countDownLatch.getCount(),
+            countDownLatch.getRegisteredParties(), jobId, containerIdOptional.or("")));
+        if (countDownLatch.await(10, TimeUnit.SECONDS)) {
+          break;
+        }
       }
+    } catch (InterruptedException interrupt) {
+      log.info("Job interrupted by InterrupedException.");
+      interruptTaskExecution(countDownLatch);
     }
     log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or(""));
   }
 
+  private void interruptTaskExecution(CountDownLatch countDownLatch) throws InterruptedException {
+    log.info("Job interrupted. Attempting a graceful shutdown of the job.");
+    this.tasks.forEach(Task::shutdown);
+    if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
+      log.warn("Graceful shutdown of job timed out. Killing all outstanding tasks.");
+      try {
+        this.taskExecutor.shutDown();
+      } catch (Throwable t) {
+        throw new RuntimeException("Failed to shutdown task executor.", t);
+      }
+    }
+  }
+
   /**
    * Commit {@link #tasks} by 1. calling {@link Task#commit()} in parallel; 2. executing any additional {@link CommitStep};
    * 3. persist task statestore.
@@ -468,7 +494,8 @@ public class GobblinMultiTaskAttempt {
   public static GobblinMultiTaskAttempt runWorkUnits(String jobId, String containerId, JobState jobState,
       List<WorkUnit> workUnits, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor,
       StateStore<TaskState> taskStateStore,
-      CommitPolicy multiTaskAttemptCommitPolicy, SharedResourcesBroker<GobblinScopeTypes> jobBroker)
+      CommitPolicy multiTaskAttemptCommitPolicy, SharedResourcesBroker<GobblinScopeTypes> jobBroker,
+      Predicate<GobblinMultiTaskAttempt> interruptionPredicate)
       throws IOException, InterruptedException {
 
     // dump the work unit if tracking logs are enabled
@@ -480,6 +507,7 @@ public class GobblinMultiTaskAttempt {
     GobblinMultiTaskAttempt multiTaskAttempt =
         new GobblinMultiTaskAttempt(workUnits.iterator(), jobId, jobState, taskStateTracker, taskExecutor,
             Optional.of(containerId), Optional.of(taskStateStore), jobBroker);
+    multiTaskAttempt.setInterruptionPredicate(interruptionPredicate);
 
     multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
     return multiTaskAttempt;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index c0f9271..45d3876 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.job.JobProgress;
+import org.apache.gobblin.runtime.job.TaskProgress;
 import org.apache.hadoop.io.Text;
 
 import com.codahale.metrics.Counter;
@@ -67,7 +69,7 @@ import org.apache.gobblin.util.ImmutableProperties;
  *
  * @author Yinan Li
  */
-public class JobState extends SourceState {
+public class JobState extends SourceState implements JobProgress {
 
   /**
    * An enumeration of possible job states, which are identical to
@@ -248,6 +250,20 @@ public class JobState extends SourceState {
   }
 
   /**
+   * Get the currently elapsed time for this job.
+   * @return
+   */
+  public long getElapsedTime() {
+    if (this.endTime > 0) {
+      return  this.endTime - this.startTime;
+    }
+    if (this.startTime > 0) {
+      return System.currentTimeMillis() - this.startTime;
+    }
+    return 0;
+  }
+
+  /**
    * Set job end time.
    *
    * @param endTime job end time
@@ -393,6 +409,11 @@ public class JobState extends SourceState {
     return ImmutableList.<TaskState>builder().addAll(this.taskStates.values()).build();
   }
 
+  @Override
+  public List<TaskState> getTaskProgress() {
+    return getTaskStates();
+  }
+
   /**
    * Create a {@link Map} from dataset URNs (as being specified by {@link ConfigurationKeys#DATASET_URN_KEY} to
    * {@link DatasetState} objects that represent the dataset states and store {@link TaskState}s corresponding
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 049a3ff..8ddd1d4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -462,7 +462,7 @@ public class Task implements TaskIFace {
 
       RecordEnvelope recordEnvelope;
       // Extract, convert, and fork one source record at a time.
-      while (!shutdownRequested() && (recordEnvelope = extractor.readRecordEnvelope()) != null) {
+      while ((recordEnvelope = extractor.readRecordEnvelope()) != null) {
         onRecordExtract();
         AcknowledgableWatermark ackableWatermark = new AcknowledgableWatermark(recordEnvelope.getWatermark());
         if (watermarkTracker.isPresent()) {
@@ -473,6 +473,9 @@ public class Task implements TaskIFace {
               ackableWatermark.incrementAck());
         }
         ackableWatermark.ack();
+        if (shutdownRequested()) {
+          extractor.shutdown();
+        }
       }
     } else {
       RecordEnvelope record;
@@ -495,6 +498,9 @@ public class Task implements TaskIFace {
             throw new RuntimeException(e);
           }
         }
+        if (shutdownRequested()) {
+          extractor.shutdown();
+        }
       }
     }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
index a9d99be..50bba37 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.gobblin.runtime.job.TaskProgress;
 import org.apache.hadoop.io.Text;
 
 import com.codahale.metrics.Counter;
@@ -59,7 +60,7 @@ import lombok.Getter;
  *
  * @author Yinan Li
  */
-public class TaskState extends WorkUnitState {
+public class TaskState extends WorkUnitState implements TaskProgress {
 
   // Built-in metric names
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index b6cc3b5..dced320 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -200,7 +200,12 @@ public class EmbeddedGobblin {
    * will appear first in the classpath. Default priority is 0.
    */
   public EmbeddedGobblin distributeJarByClassWithPriority(Class<?> klazz, int priority) {
-    return distributeJarWithPriority(ClassUtil.findContainingJar(klazz), priority);
+    String jar = ClassUtil.findContainingJar(klazz);
+    if (jar == null) {
+      log.warn(String.format("Could not find jar for class %s. This is normal in test runs.", klazz));
+      return this;
+    }
+    return distributeJarWithPriority(jar, priority);
   }
 
   /**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
new file mode 100644
index 0000000..d36f68c
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.io.IOException;
+
+import org.apache.gobblin.fsm.FiniteStateMachine;
+import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.runtime.JobState;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link FiniteStateMachine} implementation to track the state of a Gobblin job executor.
+ */
+@Slf4j
+public class GobblinJobFiniteStateMachine extends FiniteStateMachine<GobblinJobFiniteStateMachine.JobFSMState> {
+
+	/**
+	 * Types of state the job can be in.
+	 */
+	public enum StateType {
+		PREPARING, RUNNING, INTERRUPTED, CANCELLED, SUCCESS, FAILED
+	}
+
+	/**
+	 * State of a job.
+	 */
+	@AllArgsConstructor(access = AccessLevel.PRIVATE)
+	@EqualsAndHashCode(of = "stateType")
+	@ToString
+	@Getter
+	public static class JobFSMState {
+		private final StateType stateType;
+	}
+
+	/**
+	 * A special {@link JobFSMState} that is aware of how to interrupt a running job.
+	 */
+	private class RunnableState extends JobFSMState implements StateWithCallbacks<JobFSMState> {
+		private final JobInterruptionPredicate jobInterruptionPredicate;
+
+		public RunnableState() {
+			super(StateType.RUNNING);
+			if (GobblinJobFiniteStateMachine.this.interruptGracefully == null) {
+				this.jobInterruptionPredicate = null;
+			} else {
+				this.jobInterruptionPredicate = new JobInterruptionPredicate(GobblinJobFiniteStateMachine.this.jobState,
+						GobblinJobFiniteStateMachine.this::interruptRunningJob, false);
+			}
+		}
+
+		@Override
+		public void onEnterState(@Nullable JobFSMState previousState) {
+			if (this.jobInterruptionPredicate != null) {
+				this.jobInterruptionPredicate.startAsync();
+			}
+		}
+
+		@Override
+		public void onLeaveState(JobFSMState nextState) {
+			if (this.jobInterruptionPredicate != null) {
+				this.jobInterruptionPredicate.stopAsync();
+			}
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			return super.equals(o);
+		}
+
+		@Override
+		public int hashCode() {
+			return super.hashCode();
+		}
+	}
+
+	/**
+	 * A runnable that allows for {@link IOException}s.
+	 */
+	@FunctionalInterface
+	public interface RunnableWithIoException {
+		void run() throws IOException;
+	}
+
+	private final JobState jobState;
+	private final RunnableWithIoException interruptGracefully;
+	private final RunnableWithIoException killJob;
+
+	@lombok.Builder
+	private GobblinJobFiniteStateMachine(JobState jobState, RunnableWithIoException interruptGracefully,
+			RunnableWithIoException killJob) {
+		super(buildAllowedTransitions(), Sets.newHashSet(new JobFSMState(StateType.CANCELLED)), new JobFSMState(StateType.FAILED),
+				new JobFSMState(StateType.PREPARING));
+
+		if (jobState == null) {
+			throw new IllegalArgumentException("Job state is required.");
+		}
+
+		this.jobState = jobState;
+		this.interruptGracefully = interruptGracefully;
+		this.killJob = killJob;
+	}
+
+	/**
+	 * Callers should use this method to obtain the {@link JobFSMState} for a particular {@link StateType}, as the
+	 * {@link JobFSMState} might contain additional functionality like running other services, etc.
+	 * @param stateType
+	 * @return
+	 */
+	public JobFSMState getEndStateForType(StateType stateType) {
+		switch (stateType) {
+			case RUNNING:
+				return new RunnableState();
+			default:
+				return new JobFSMState(stateType);
+		}
+	}
+
+	private void interruptRunningJob() {
+		log.info("Interrupting job execution.");
+		try (FiniteStateMachine<JobFSMState>.Transition transition = startTransition(getEndStateForType(StateType.INTERRUPTED))) {
+				try {
+					this.interruptGracefully.run();
+				} catch (IOException ioe) {
+					transition.changeEndState(getEndStateForType(StateType.FAILED));
+				}
+		} catch (FiniteStateMachine.UnallowedTransitionException exc) {
+			log.error("Cannot interrupt job.", exc);
+		} catch (InterruptedException | FailedTransitionCallbackException exc) {
+			log.error("Cannot finish graceful job interruption. Killing job.", exc);
+			try {
+				this.killJob.run();
+			} catch (IOException ioe) {
+				log.error("Failed to kill job.", ioe);
+			}
+			if (exc instanceof FailedTransitionCallbackException) {
+				((FailedTransitionCallbackException) exc).getTransition().switchEndStateToErrorState();
+				((FailedTransitionCallbackException) exc).getTransition().closeWithoutCallbacks();
+			}
+		}
+	}
+
+	private static SetMultimap<JobFSMState, JobFSMState> buildAllowedTransitions() {
+		SetMultimap<JobFSMState, JobFSMState> transitions = HashMultimap.create();
+		transitions.put(new JobFSMState(StateType.PREPARING), new JobFSMState(StateType.RUNNING));
+		transitions.put(new JobFSMState(StateType.PREPARING), new JobFSMState(StateType.FAILED));
+		transitions.put(new JobFSMState(StateType.PREPARING), new JobFSMState(StateType.INTERRUPTED));
+		transitions.put(new JobFSMState(StateType.RUNNING), new JobFSMState(StateType.SUCCESS));
+		transitions.put(new JobFSMState(StateType.RUNNING), new JobFSMState(StateType.FAILED));
+		transitions.put(new JobFSMState(StateType.RUNNING), new JobFSMState(StateType.INTERRUPTED));
+		return transitions;
+	}
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
new file mode 100644
index 0000000..6f67657
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.util.ReflectivePredicateEvaluator;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * This class evaluates a predicate on the {@link JobProgress} of a job and calls a job interruption hook when the
+ * predicate is satisfied.
+ *
+ * It is used to preemptively stop jobs after they satisfy some completion predicate (e.g. more than 15 minutes have
+ * elapsed and at least 75% of tasks have finished).
+ */
+@Slf4j
+public class JobInterruptionPredicate extends AbstractScheduledService {
+
+	public static final String INTERRUPTION_SQL = "org.apache.gobblin.jobInterruptionPredicate.sql";
+
+	private final String sql;
+	private final ReflectivePredicateEvaluator evaluator;
+	private final JobProgress jobProgress;
+	private final Runnable jobInterruptionHook;
+
+	public JobInterruptionPredicate(JobState jobState, Runnable jobInterruptionHook, boolean autoStart) {
+		this(jobState, jobState.getProp(INTERRUPTION_SQL), jobInterruptionHook, autoStart);
+	}
+
+	protected JobInterruptionPredicate(JobProgress jobProgress, String predicate,
+			Runnable jobInterruptionHook, boolean autoStart) {
+		this.sql = predicate;
+
+		ReflectivePredicateEvaluator tmpEval = null;
+		if (this.sql != null) {
+			try {
+				tmpEval = new ReflectivePredicateEvaluator(this.sql, JobProgress.class, TaskProgress.class);
+			} catch (SQLException exc) {
+				log.warn("Job interruption predicate is invalid, will not preemptively interrupt job.", exc);
+			}
+		}
+		this.evaluator = tmpEval;
+		this.jobProgress = jobProgress;
+		this.jobInterruptionHook = jobInterruptionHook;
+
+		if (autoStart && this.sql != null) {
+			startAsync();
+		}
+	}
+
+	@Override
+	protected void runOneIteration() {
+		if (this.evaluator == null) {
+			stopAsync();
+			return;
+		}
+		switch (this.jobProgress.getState()) {
+			case PENDING:
+				return;
+			case RUNNING:
+				try {
+					List<Object> objects = Stream.concat(Stream.<Object>of(this.jobProgress), this.jobProgress.getTaskProgress().stream()).collect(
+							Collectors.toList());
+					if (this.evaluator.evaluate(objects)) {
+						log.info("Interrupting job due to satisfied job interruption predicate. Predicate: " + this.sql);
+						this.jobInterruptionHook.run();
+						stopAsync();
+					}
+				} catch (Throwable exc) {
+					log.warn("Failed to evaluate job interruption predicate. Will not preemptively interrupt job.", exc);
+					throw Throwables.propagate(exc);
+				}
+				break;
+			default:
+				log.info(String.format("Detected job finished with state %s. Stopping job interruption predicate.",
+						this.jobProgress.getState()));
+				stopAsync();
+		}
+	}
+
+	@Override
+	protected Scheduler scheduler() {
+		return Scheduler.newFixedDelaySchedule(30, 30, TimeUnit.SECONDS);
+	}
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java
new file mode 100644
index 0000000..ad1308a
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.util.List;
+
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * Type used to retrieve the progress of a Gobblin job.
+ */
+public interface JobProgress {
+
+	String getJobId();
+	int getTaskCount();
+	int getCompletedTasks();
+	long getElapsedTime();
+	JobState.RunningState getState();
+
+	List<? extends TaskProgress> getTaskProgress();
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
new file mode 100644
index 0000000..1ec8d61
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+
+
+/**
+ * Interface used to retrieve the progress of a task in a Gobblin job.
+ */
+public interface TaskProgress {
+
+	String getJobId();
+	String getTaskId();
+	WorkUnitState.WorkingState getWorkingState();
+	boolean isCompleted();
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
index fc0e197..6b6bc16 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.runtime.job.JobInterruptionPredicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,8 +144,12 @@ public class LocalJobLauncher extends AbstractJobLauncher {
       }
     });
 
+    Thread thisThread = Thread.currentThread();
+    JobInterruptionPredicate jobInterruptionPredicate =
+        new JobInterruptionPredicate(jobState, () -> thisThread.interrupt(), true);
     GobblinMultiTaskAttempt.runWorkUnits(this.jobContext, workUnitsWithJobState, this.taskStateTracker,
         this.taskExecutor, GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+    jobInterruptionPredicate.stopAsync();
 
     if (this.cancellationRequested) {
       // Wait for the cancellation execution if it has been requested
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 1f53054..529fe11 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -27,6 +27,9 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.fsm.FiniteStateMachine;
+import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -82,6 +85,8 @@ import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.TaskStateCollectorService;
 import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.JobFSMState;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.StateType;
 import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.runtime.util.MetricGroup;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -110,6 +115,9 @@ import org.apache.gobblin.util.SerializationUtils;
  */
 public class MRJobLauncher extends AbstractJobLauncher {
 
+  private static final String INTERRUPT_JOB_FILE_NAME = "_INTERRUPT_JOB";
+  private static final String GOBBLIN_JOB_INTERRUPT_PATH_KEY = "gobblin.jobInterruptPath";
+
   private static final Logger LOG = LoggerFactory.getLogger(MRJobLauncher.class);
 
   private static final String JOB_NAME_PREFIX = "Gobblin-";
@@ -148,6 +156,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
   private final StateStore<TaskState> taskStateStore;
 
   private final int jarFileMaximumRetry;
+  private final Path interruptPath;
+  private final GobblinJobFiniteStateMachine fsm;
 
   public MRJobLauncher(Properties jobProps) throws Exception {
     this(jobProps, null);
@@ -170,6 +180,9 @@ public class MRJobLauncher extends AbstractJobLauncher {
       throws Exception {
     super(jobProps, metadataTags);
 
+    this.fsm = GobblinJobFiniteStateMachine.builder().jobState(jobContext.getJobState())
+        .interruptGracefully(this::interruptGracefully).killJob(this::killJob).build();
+
     this.conf = conf;
     // Put job configuration properties into the Hadoop configuration so they are available in the mappers
     JobConfigurationUtils.putPropertiesIntoConfiguration(this.jobProps, this.conf);
@@ -185,6 +198,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     this.mrJobDir = new Path(
         new Path(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY), this.jobContext.getJobName()),
         this.jobContext.getJobId());
+    this.interruptPath = new Path(this.mrJobDir, INTERRUPT_JOB_FILE_NAME);
     if (this.fs.exists(this.mrJobDir)) {
       LOG.warn("Job working directory already exists for job " + this.jobContext.getJobName());
       this.fs.delete(this.mrJobDir, true);
@@ -252,26 +266,35 @@ public class MRJobLauncher extends AbstractJobLauncher {
       this.taskStateCollectorService.startAsync().awaitRunning();
 
       LOG.info("Launching Hadoop MR job " + this.job.getJobName());
-      this.job.submit();
-      this.hadoopJobSubmitted = true;
+      try (FiniteStateMachine<JobFSMState>.Transition t = this.fsm.startTransition(this.fsm.getEndStateForType(StateType.RUNNING))) {
+        try {
+          this.job.submit();
+        } catch (Throwable exc) {
+          t.changeEndState(this.fsm.getEndStateForType(StateType.FAILED));
+          throw exc;
+        }
+        this.hadoopJobSubmitted = true;
 
-      // Set job tracking URL to the Hadoop job tracking URL if it is not set yet
-      if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
-        jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY, this.job.getTrackingURL());
+        // Set job tracking URL to the Hadoop job tracking URL if it is not set yet
+        if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
+          jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY, this.job.getTrackingURL());
+        }
+      } catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
+        LOG.error("Cannot start MR job.", unallowed);
       }
 
-      TimingEvent mrJobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
-      LOG.info(String.format("Waiting for Hadoop MR job %s to complete", this.job.getJobID()));
-      this.job.waitForCompletion(true);
-      mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId", this.job.getJobID().toString()));
+      if (this.fsm.getCurrentState().getStateType().equals(StateType.RUNNING)) {
+        TimingEvent mrJobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
+        LOG.info(String.format("Waiting for Hadoop MR job %s to complete", this.job.getJobID()));
 
-      if (this.cancellationRequested) {
-        // Wait for the cancellation execution if it has been requested
-        synchronized (this.cancellationExecution) {
-          if (this.cancellationExecuted) {
-            return;
-          }
-        }
+        this.job.waitForCompletion(true);
+        this.fsm.transitionIfAllowed(fsm.getEndStateForType(StateType.SUCCESS));
+
+        mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId", this.job.getJobID().toString()));
+      }
+
+      if (this.fsm.getCurrentState().getStateType().equals(StateType.CANCELLED)) {
+        return;
       }
 
       // Create a metrics set for this job run from the Hadoop counters.
@@ -286,18 +309,52 @@ public class MRJobLauncher extends AbstractJobLauncher {
 
   @Override
   protected void executeCancellation() {
-    try {
-      if (this.hadoopJobSubmitted && !this.job.isComplete()) {
-        LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId());
-        this.job.killJob();
-        // Collect final task states.
-        this.taskStateCollectorService.stopAsync().awaitTerminated();
+    try (FiniteStateMachine<JobFSMState>.Transition transition =
+        this.fsm.startTransition(this.fsm.getEndStateForType(StateType.CANCELLED))) {
+      if (transition.getStartState().getStateType().equals(StateType.RUNNING)) {
+        try {
+          killJob();
+        } catch (IOException ioe) {
+          LOG.error("Failed to kill the Hadoop MR job for job " + this.jobContext.getJobId());
+          transition.changeEndState(this.fsm.getEndStateForType(StateType.FAILED));
+        }
+      }
+    } catch (GobblinJobFiniteStateMachine.FailedTransitionCallbackException exc) {
+      exc.getTransition().switchEndStateToErrorState();
+      exc.getTransition().closeWithoutCallbacks();
+    } catch (FiniteStateMachine.UnallowedTransitionException | InterruptedException exc) {
+      LOG.error("Failed to cancel job " + this.jobContext.getJobId(), exc);
+    }
+  }
+
+  /**
+   * Attempt a gracious interruption of the running job
+   */
+  private void interruptGracefully() throws IOException {
+    LOG.info("Attempting graceful interruption of job " + this.jobContext.getJobId());
+
+    this.fs.createNewFile(this.interruptPath);
+
+    long waitTimeStart = System.currentTimeMillis();
+    while (!this.job.isComplete() && System.currentTimeMillis() < waitTimeStart + 30 * 1000) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        break;
       }
-    } catch (IllegalStateException ise) {
-      LOG.error("The Hadoop MR job has not started for job " + this.jobContext.getJobId());
-    } catch (IOException ioe) {
-      LOG.error("Failed to kill the Hadoop MR job for job " + this.jobContext.getJobId());
     }
+
+    if (!this.job.isComplete()) {
+      LOG.info("Interrupted job did not shut itself down after timeout. Killing job.");
+      this.job.killJob();
+    }
+  }
+
+  private void killJob() throws IOException {
+    LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId());
+    this.job.killJob();
+    // Collect final task states.
+    this.taskStateCollectorService.stopAsync().awaitTerminated();
   }
 
   /**
@@ -381,6 +438,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
           Integer.parseInt(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)));
     }
 
+    this.job.getConfiguration().set(GOBBLIN_JOB_INTERRUPT_PATH_KEY, this.interruptPath.toString());
+
     mrJobSetupTimer.stop();
   }
 
@@ -679,6 +738,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
     @Override
     public void run(Context context) throws IOException, InterruptedException {
       this.setup(context);
+
+      Path interruptPath = new Path(context.getConfiguration().get(GOBBLIN_JOB_INTERRUPT_PATH_KEY));
+      if (this.fs.exists(interruptPath)) {
+        LOG.info(String.format("Found interrupt path %s indicating the driver has interrupted the job, aborting mapper.", interruptPath));
+        return;
+      }
+
       GobblinMultiTaskAttempt gobblinMultiTaskAttempt = null;
       try {
         // De-serialize and collect the list of WorkUnits to run
@@ -701,7 +767,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
         gobblinMultiTaskAttempt =
             GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(),
                 this.jobState, this.workUnits, this.taskStateTracker, this.taskExecutor, this.taskStateStore,
-                multiTaskAttemptCommitPolicy, jobBroker);
+                multiTaskAttemptCommitPolicy, jobBroker, (gmta) -> {
+                  try {
+                    return this.fs.exists(interruptPath);
+                  } catch (IOException ioe) {
+                    return false;
+                  }
+                });
 
         if (this.isSpeculativeEnabled) {
           LOG.info("will not commit in task attempt");
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
index f2cd04b..9a798c1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
@@ -15,40 +15,6 @@
  * limitations under the License.
  */
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.gobblin.runtime.spec_catalog;
 
 import java.io.Closeable;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
new file mode 100644
index 0000000..ead087d
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ProjectableFilterableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.AbstractTable;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+
+import lombok.Data;
+
+
+/**
+ * An predicate evaluator that uses an interface to define a table schema and can evaluate SQL statements on instances of
+ * that interface. See {@link ReflectivePredicateEvaluatorTest} for examples.
+ *
+ * Note all evaluated statements must return a single row with a single boolean column.
+ *
+ * Usage:
+ * ReflectivePredicateEvaluator<MyInterface> evaluator = new ReflectivePredicateEvaluator<>(MyInterface.class, "SELECT ... FROM myInterface");
+ * evaluator.evaluate(instance1, instance2, ...); // use the statement provided in constructor
+ * -- or --
+ * evaluator.evaluate("SELECT ... FROM myInterface", instance1, instance2, ...);
+ */
+public class ReflectivePredicateEvaluator implements Closeable {
+	private static final String REFERENCE_INTERFACES = "refInterface";
+	private static final String OPERATOR_ID = "operatorId";
+	private static final Pattern FIELD_NAME_EXTRACTOR = Pattern.compile("(?:get([A-Z]))?(.+)");
+
+	private static final String MODEL_PATTERN = "{"
+			+ "version: 1, defaultSchema: 'MAIN',"
+			+ "schemas: ["
+			+ "{name: 'MAIN', type: 'custom', factory: '%s', operand: {%s: '%s', %s: '%d'}}"
+			+ "]}";
+	private static final String CONNECT_STRING_PATTERN = "jdbc:calcite:model=inline:%s";
+
+	private static final Cache<Integer, ReflectivePredicateEvaluator> REGISTRY = CacheBuilder.newBuilder().weakValues().build();
+	private static final AtomicInteger IDENTIFIER = new AtomicInteger();
+
+	private final List<Class<?>> referenceInterfaces;
+	private final int identifier;
+	private final Connection conn;
+	private final PreparedStatement stmnt;
+
+	private final String sql;
+
+	private volatile List<Object> objects;
+
+	/**
+	 * @param sql The default SQL expression to run in this evaluator.
+	 * @param referenceInterfaces The interface that will be used to generate the table schema.
+	 * @throws SQLException
+	 */
+	public ReflectivePredicateEvaluator(String sql, Class<?>... referenceInterfaces) throws SQLException  {
+		this.referenceInterfaces = Lists.newArrayList(referenceInterfaces);
+		this.sql = sql;
+
+		this.identifier = IDENTIFIER.getAndIncrement();
+		REGISTRY.put(this.identifier, this);
+
+		String model = computeModel();
+		String connectString = String.format(CONNECT_STRING_PATTERN, model);
+
+		this.conn =
+				DriverManager.getConnection(connectString);
+		this.stmnt = prepareStatement(sql);
+	}
+
+	private PreparedStatement prepareStatement(String sql) throws SQLException {
+		PreparedStatement stmnt = null;
+		try {
+			stmnt = this.conn.prepareStatement(sql);
+			validateSql(stmnt, sql);
+			return stmnt;
+		} catch (Throwable t) {
+			if (stmnt != null) {
+				stmnt.close();
+			}
+			throw t;
+		}
+	}
+
+	private String computeModel() {
+		return String.format(MODEL_PATTERN, PESchemaFactory.class.getName(), REFERENCE_INTERFACES,
+				Joiner.on(",").join(this.referenceInterfaces.stream().map(Class::getName).collect(Collectors.toList())),
+				OPERATOR_ID, this.identifier);
+	}
+
+	private void validateSql(PreparedStatement stmnt, String sql) throws SQLException {
+		ResultSetMetaData metaData = stmnt.getMetaData();
+
+		if (metaData.getColumnCount() != 1 || metaData.getColumnType(1) != Types.BOOLEAN) {
+			throw new IllegalArgumentException("Statement is expected to return a single boolean column. Provided statement: " + sql);
+		}
+	}
+
+	/**
+	 * Evaluate the default predicate on the list of provided objects.
+	 * @throws SQLException
+	 */
+	public boolean evaluate(Object... objects) throws SQLException{
+		return evaluate(Lists.newArrayList(objects), null);
+	}
+
+	/**
+	 * Evaluate an ad-hoc predicate on the list of provided objects.
+	 * Note {@link #evaluate(Object...)} is preferable as it only does validation of the expression once.
+	 * @throws SQLException
+	 */
+	public boolean evaluate(String sql, Object... objects) throws SQLException{
+		return evaluate(Lists.newArrayList(objects), sql);
+	}
+
+	/**
+	 * Evaluate the default predicate on the list of provided objects.
+	 * @throws SQLException
+	 */
+	public boolean evaluate(List<Object> objects) throws SQLException {
+		return evaluate(objects, null);
+	}
+
+	/**
+	 * Evaluate an ad-hoc predicate on the list of provided objects.
+	 * Note {@link #evaluate(Object[])} is preferable as it only does validation of the expression once.
+	 * @throws SQLException
+	 */
+	public boolean evaluate(List<Object> objects, String sql) throws SQLException {
+		synchronized (this) {
+			String actualSql = sql == null ? this.sql : sql;
+			PreparedStatement actualStmnt = null;
+			try {
+				actualStmnt = sql == null ? this.stmnt : prepareStatement(sql);
+
+				this.objects = objects;
+				actualStmnt.execute();
+				ResultSet rs = actualStmnt.getResultSet();
+				if (!rs.next()) {
+					throw new IllegalArgumentException("Expected at least one returned row. SQL evaluated: " + actualSql);
+				}
+				boolean result = true;
+				do {
+					result &= rs.getBoolean(1);
+				} while (rs.next());
+				return result;
+			} finally {
+				if (sql != null && actualStmnt != null) {
+					actualStmnt.close();
+				}
+			}
+		}
+	}
+
+	@Override
+	public void close()
+			throws IOException {
+		try {
+			if (this.stmnt != null) {
+				this.stmnt.close();
+			}
+			if (this.conn != null) {
+				this.conn.close();
+			}
+		} catch (SQLException exc) {
+			throw new IOException("Failed to close " + ReflectivePredicateEvaluator.class.getSimpleName(), exc);
+		}
+	}
+
+	/**
+	 * Calcite {@link SchemaFactory} used for the evaluator.
+	 * This class is public because Calcite uses reflection to instantiate it, there is no reason to use it anywhere else
+	 * in Gobblin.
+	 */
+	public static class PESchemaFactory implements SchemaFactory {
+
+		@Override
+		public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+			try {
+				List<Class<?>> referenceInterfaces = new ArrayList<>();
+				for (String iface : Splitter.on(",").splitToList(operand.get(REFERENCE_INTERFACES).toString())) {
+					referenceInterfaces.add(Class.forName(iface));
+				}
+				int operatorIdentifier = Integer.parseInt(operand.get(OPERATOR_ID).toString());
+
+				return new AbstractSchema() {
+					@Override
+					protected Map<String, Table> getTableMap() {
+						HashMap<String, Table> map = new HashMap<>();
+						for (Class<?> iface : referenceInterfaces) {
+							map.put(iface.getSimpleName().toUpperCase(),
+									new PETable(iface, operatorIdentifier));
+						}
+						return map;
+					}
+				};
+			} catch (ReflectiveOperationException roe) {
+				throw new RuntimeException(roe);
+			}
+		}
+	}
+
+	@Data
+	private static class PETable extends AbstractTable implements ProjectableFilterableTable {
+		private final Class<?> referenceInterface;
+		private final int operatorIdentifier;
+		private volatile boolean initialized = false;
+
+		private RelDataType rowType;
+		private List<Function<Object, Object>> methodsForFields = new ArrayList<>();
+
+		@Override
+		public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters, int[] projects) {
+			List<Object> list = REGISTRY.getIfPresent(this.operatorIdentifier).objects;
+
+			final int[] actualProjects = resolveProjects(projects);
+
+			Enumerator<Object[]> enumerator =  Linq4j.enumerator(list.stream()
+					.filter(o -> referenceInterface.isAssignableFrom(o.getClass()))
+					.map(
+					m -> {
+						Object[] res = new Object[actualProjects.length];
+						for (int i = 0; i < actualProjects.length; i++) {
+							res[i] = methodsForFields.get(actualProjects[i]).apply(m);
+						}
+						return res;
+					}
+			).collect(Collectors.toList()));
+
+			return new AbstractEnumerable<Object[]>() {
+				@Override
+				public Enumerator<Object[]> enumerator() {
+					return enumerator;
+				}
+			};
+		}
+
+		private int[] resolveProjects(int[] projects) {
+			if (projects == null) {
+				projects = new int[methodsForFields.size()];
+				for (int i = 0; i < projects.length; i++) {
+					projects[i] = i;
+				}
+			}
+			return projects;
+		}
+
+		@Override
+		public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+			initialize((JavaTypeFactory) typeFactory);
+			return this.rowType;
+		}
+
+		private synchronized void initialize(JavaTypeFactory typeFactory) {
+			if (this.initialized) {
+				return;
+			}
+
+			this.methodsForFields = new ArrayList<>();
+			List<RelDataTypeField> fields = new ArrayList<>();
+
+			for (Method method : this.referenceInterface.getMethods()) {
+				if (method.getParameterCount() == 0) {
+					String fieldName = computeFieldName(method.getName());
+					if (fieldName != null) {
+						this.methodsForFields.add(extractorForMethod(method));
+						Class<?> retType = method.getReturnType();
+						if (retType.isEnum()) {
+							retType = String.class;
+						}
+						fields.add(new RelDataTypeFieldImpl(fieldName.toUpperCase(), fields.size(), typeFactory.createType(retType)));
+					}
+				}
+			}
+
+			this.rowType = new MyDataType(fields, referenceInterface);
+			this.initialized = true;
+		}
+
+		private Function<Object, Object> extractorForMethod(Method method) {
+			return o -> {
+				try {
+					Object ret = method.invoke(o);
+					return method.getReturnType().isEnum() ? ret.toString() : ret;
+				} catch (ReflectiveOperationException roe) {
+					throw new RuntimeException(roe);
+				}
+			};
+		}
+
+	}
+
+	private static class MyDataType extends RelDataTypeImpl {
+		private final String typeString;
+
+		public MyDataType(List<? extends RelDataTypeField> fieldList, Class<?> refInterface) {
+			super(fieldList);
+			this.typeString = refInterface.getName();
+			computeDigest();
+		}
+
+		@Override
+		protected void generateTypeString(StringBuilder sb, boolean withDetail) {
+			sb.append(typeString);
+		}
+	}
+
+	private static String computeFieldName(String methodName) {
+		Matcher matcher = FIELD_NAME_EXTRACTOR.matcher(methodName);
+		if (matcher.matches()) {
+			return matcher.group(1) + matcher.group(2);
+		}
+		return null;
+	}
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
new file mode 100644
index 0000000..21625a8
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.JobState;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+public class JobInterruptionPredicateTest {
+
+	@Test
+	public void testJobPredicate() {
+		SettableJobProgress jobProgress = new SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING, new ArrayList<>());
+		AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+		JobInterruptionPredicate predicate =
+				new JobInterruptionPredicate(jobProgress, "SELECT completedTasks > 5 FROM jobProgress", () -> atomicBoolean.set(true), false);
+
+		predicate.runOneIteration();
+		Assert.assertFalse(atomicBoolean.get());
+
+		jobProgress.completedTasks = 6;
+
+		predicate.runOneIteration();
+		Assert.assertTrue(atomicBoolean.get());
+	}
+
+	@Test
+	public void testTaskPredicate() {
+		SettableTaskProgress t1 = new SettableTaskProgress("j1", "t1", WorkUnitState.WorkingState.RUNNING, false);
+		SettableTaskProgress t2 = new SettableTaskProgress("j1", "t1", WorkUnitState.WorkingState.RUNNING, false);
+
+		SettableJobProgress jobProgress = new SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING,
+				Lists.newArrayList(t1, t2));
+		AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+		JobInterruptionPredicate predicate =
+				new JobInterruptionPredicate(jobProgress, "SELECT count(*) > 0 FROM taskProgress WHERE workingState = 'FAILED'", () -> atomicBoolean.set(true), false);
+
+		predicate.runOneIteration();
+		Assert.assertFalse(atomicBoolean.get());
+
+		t2.workingState = WorkUnitState.WorkingState.FAILED;
+
+		predicate.runOneIteration();
+		Assert.assertTrue(atomicBoolean.get());
+	}
+
+	@Test
+	public void testTaskAndJobPredicate() {
+		SettableTaskProgress t1 = new SettableTaskProgress("j1", "t1", WorkUnitState.WorkingState.RUNNING, false);
+		SettableTaskProgress t2 = new SettableTaskProgress("j1", "t1", WorkUnitState.WorkingState.RUNNING, false);
+
+		SettableJobProgress jobProgress = new SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING,
+				Lists.newArrayList(t1, t2));
+		AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+		JobInterruptionPredicate predicate =
+				new JobInterruptionPredicate(jobProgress,
+						"SELECT EXISTS(SELECT * FROM (SELECT completedTasks > 5 AS pred FROM jobProgress UNION SELECT count(*) > 0 AS pred FROM taskProgress WHERE workingState = 'FAILED') WHERE pred)",
+						() -> atomicBoolean.set(true), false);
+
+		predicate.runOneIteration();
+		Assert.assertFalse(atomicBoolean.get());
+
+		t2.workingState = WorkUnitState.WorkingState.FAILED;
+
+		predicate.runOneIteration();
+		Assert.assertTrue(atomicBoolean.get());
+		atomicBoolean.set(false);
+
+		t2.workingState = WorkUnitState.WorkingState.RUNNING;
+
+		predicate.runOneIteration();
+		Assert.assertFalse(atomicBoolean.get());
+
+		jobProgress.completedTasks = 6;
+
+		predicate.runOneIteration();
+		Assert.assertTrue(atomicBoolean.get());
+	}
+
+	@Getter
+	@AllArgsConstructor
+	public static class SettableJobProgress implements JobProgress {
+		private final String jobId;
+		private int taskCount;
+		private int completedTasks;
+		private long elapsedTime;
+		private JobState.RunningState runningState;
+		private List<TaskProgress> taskProgress;
+
+		@Override
+		public JobState.RunningState getState() {
+			return this.runningState;
+		}
+	}
+
+	@Getter
+	@AllArgsConstructor
+	public static class SettableTaskProgress implements TaskProgress {
+		private final String jobId;
+		private final String taskId;
+		private WorkUnitState.WorkingState workingState;
+		private boolean isCompleted;
+	}
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
new file mode 100644
index 0000000..ba7dc4b
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import lombok.Data;
+
+public class ReflectivePredicateEvaluatorTest {
+
+	@Test
+	public void simpleTest() throws Exception {
+		ReflectivePredicateEvaluator evaluator = new ReflectivePredicateEvaluator(
+				"SELECT anInt = 1 FROM myInterface", MyInterface.class);
+
+		Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "foo")));
+		Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, "foo")));
+
+		Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR aString = 'foo' FROM myInterface",
+				new MyImplementation(1, "bar")));
+		Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR aString = 'foo' FROM myInterface",
+				new MyImplementation(2, "foo")));
+		Assert.assertFalse(evaluator.evaluate("SELECT anInt = 1 OR aString = 'foo' FROM myInterface",
+				new MyImplementation(2, "bar")));
+	}
+
+	@Test
+	public void testWithAggregations() throws Exception {
+		ReflectivePredicateEvaluator evaluator = new ReflectivePredicateEvaluator(
+				"SELECT sum(anInt) = 5 FROM myInterface", MyInterface.class);
+
+		Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, "foo")));
+		Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "foo"), new MyImplementation(4, "foo")));
+		Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, "foo"), new MyImplementation(4, "foo")));
+	}
+
+	@Test
+	public void testWithAggregationsAndFilter() throws Exception {
+		ReflectivePredicateEvaluator evaluator = new ReflectivePredicateEvaluator(
+				"SELECT sum(anInt) = 5 FROM myInterface WHERE aString = 'foo'", MyInterface.class);
+
+		Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, "foo")));
+		Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "bar")));
+		Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, "foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "foo")));
+	}
+
+	@Test
+	public void testMultipleInterfaces() throws Exception {
+		ReflectivePredicateEvaluator evaluator = new ReflectivePredicateEvaluator(
+				"SELECT true = ALL (SELECT sum(anInt) = 2 AS satisfied FROM myInterface UNION SELECT sum(anInt) = 3 AS satisfied FROM myInterface2)",
+				MyInterface.class, MyInterface2.class);
+		Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, "foo")));
+		Assert.assertTrue(evaluator.evaluate(new MyImplementation(2, "foo"), new MyImplementation2(3)));
+		Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "foo"), new MyImplementation2(3), new MyImplementation(1, "foo")));
+	}
+
+	@Test
+	public void testMultipleOutputs() throws Exception {
+		ReflectivePredicateEvaluator evaluator =
+				new ReflectivePredicateEvaluator("SELECT anInt = 1 FROM myInterface", MyInterface.class);
+		Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "bar"), new MyImplementation(1, "foo")));
+		Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, "bar"), new MyImplementation(2, "foo")));
+	}
+
+	@Test
+	public void testInvalidSQL() throws Exception {
+		try {
+			ReflectivePredicateEvaluator evaluator =
+					new ReflectivePredicateEvaluator("SELECT anInt FROM myInterface", MyInterface.class);
+			Assert.fail();
+		} catch (IllegalArgumentException exc) {
+			// Expected
+		}
+	}
+
+	@Test
+	public void testNoOutputs() throws Exception {
+		try {
+			ReflectivePredicateEvaluator evaluator =
+					new ReflectivePredicateEvaluator("SELECT anInt = 1 FROM myInterface WHERE aString = 'foo'",
+							MyInterface.class);
+			evaluator.evaluate(new MyImplementation(1, "bar"));
+			Assert.fail();
+		} catch (IllegalArgumentException exc) {
+			// Expected
+		}
+	}
+
+	private interface MyInterface {
+		int getAnInt();
+		String getAString();
+	}
+
+	@Data
+	private static class MyImplementation implements MyInterface {
+		private final int anInt;
+		private final String aString;
+	}
+
+	private interface MyInterface2 {
+		int getAnInt();
+	}
+
+	@Data
+	private static class MyImplementation2 implements MyInterface2 {
+		private final int anInt;
+	}
+
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
new file mode 100644
index 0000000..20a2bf8
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of a basic FiniteStateMachine that allows keeping track of the state its state and gating certain
+ * logic on whether a transition is valid or not.
+ *
+ * This class is useful in situations where logic is complex, possibly multi-threaded, and can take multiple paths. Certain
+ * pieces of logic (for example running a job, publishing a dataset, etc) can only happen if other actions ended correctly,
+ * and the FSM is a way of simplifying the encoding and verification of those conditions. It is understood that state
+ * transitions may not be instantaneous, and that other state transitions should not start until the current one has
+ * been resolved.
+ *
+ * All public methods of this class will wait until the FSM is in a non-transitioning state. If multiple transitions are
+ * queued at the same time, the order in which they are executed is essentially random.
+ *
+ * The states supported by FSM can be enums or instances of any base type. The legality of a transition is determined
+ * by equality, i.e. if a transition A -> B is legal, the current state is A' and the desired end state is B', the transition
+ * will be legal if A.equals(A') && B.equals(B'). This allows for storing additional information into the current state
+ * as long as it does not affect the equality check (i.e. fields that are not compared in the equals check can store
+ * state metadata, etc.).
+ *
+ * Suggested Usage:
+ * FiniteStateMachine<MySymbols> fsm = new FiniteStateMachine.Builder().addTransition(START_SYMBOL, END_SYMBOL).build(initialSymbol);
+ *
+ * try (Transition transition = fsm.startTransition(MY_END_STATE)) {
+ *   try {
+ *     // my logic
+ *   } catch (MyException exc) {
+ *     transition.changeEndState(MY_ERROR);
+ *   }
+ * } catch (UnallowedTransitionException exc) {
+ *   // Cannot execute logic because it's an illegal transition!
+ * } catch (ReentrantStableStateWait exc) {
+ * 	 // Somewhere in the logic an instruction tried to do an operation with the fsm that would likely cause a deadlock
+ * } catch (AbandonedTransitionException exc) {
+ *   // Another thread initiated a transition and became inactive ending the transition
+ * } catch (InterruptedException exc) {
+ *   // Could not start transition because thread got interrupted while waiting for a non-transitioning state
+ * } catch (FailedTransitionCallbackException exc) {
+ *   // A callback in the transition start or end states has failed.
+ *   exc.getTransition().changeEndState(MY_ERROR).closeWithoutCallbacks(); // example handling
+ * }
+ *
+ * @param <T>
+ */
+@Slf4j
+public class FiniteStateMachine<T> {
+
+	/**
+	 * Used to build a {@link FiniteStateMachine} instance.
+	 */
+	public static class Builder<T> {
+		private final SetMultimap<T, T> allowedTransitions;
+		private final Set<T> universalEnds;
+    private T errorState;
+
+    public Builder() {
+			this.allowedTransitions = HashMultimap.create();
+			this.universalEnds = new HashSet<>();
+		}
+
+		/**
+		 * Add a legal transition to the {@link FiniteStateMachine}.
+		 */
+		public Builder<T> addTransition(T startState, T endState) {
+			this.allowedTransitions.put(startState, endState);
+			return this;
+		}
+
+		/**
+		 * Specify that a state is a valid end state for a transition starting from any state. Useful for example for
+		 * error states.
+		 */
+		public Builder<T> addUniversalEnd(T state) {
+			this.universalEnds.add(state);
+			return this;
+		}
+
+    /**
+     * Specify the error state to which this machine can transition if nothing else is possible. Note the error state
+     * is always an allowed end state.
+     */
+		public Builder<T> errorState(T state) {
+		  this.errorState = state;
+		  return this;
+    }
+
+		/**
+		 * Build a {@link FiniteStateMachine} starting at the given initial state.
+		 */
+		public FiniteStateMachine<T> build(T initialState) {
+			return new FiniteStateMachine<>(this.allowedTransitions, this.universalEnds, this.errorState, initialState);
+		}
+	}
+
+	private final SetMultimap<T, T> allowedTransitions;
+	private final Set<T> universalEnds;
+	private final T errorState;
+
+	private final ReentrantReadWriteLock lock;
+	private final Condition condition;
+	private final T initialState;
+
+	private volatile T currentState;
+	private volatile Transition currentTransition;
+
+	protected FiniteStateMachine(SetMultimap<T, T> allowedTransitions, Set<T> universalEnds, T errorState, T initialState) {
+		this.allowedTransitions = allowedTransitions;
+		this.universalEnds = universalEnds;
+		this.errorState = errorState;
+
+		this.lock = new ReentrantReadWriteLock();
+		this.condition = this.lock.writeLock().newCondition();
+		this.initialState = initialState;
+		this.currentState = initialState;
+
+		if (this.currentState instanceof StateWithCallbacks) {
+		  ((StateWithCallbacks) this.currentState).onEnterState(null);
+    }
+	}
+
+	/**
+	 * Start a transition to the end state specified. The returned {@link Transition} object is a closeable that will finalize
+	 * the transition when it is closed. While the transition is open, no other transition can start.
+	 *
+	 * It is recommended to call this method only within a try-with-resource block to ensure the transition is closed.
+	 *
+	 * @throws UnallowedTransitionException If the transition is not allowed.
+	 * @throws InterruptedException if the thread got interrupted while waiting for a non-transitioning state.
+	 */
+	public Transition startTransition(T endState) throws UnallowedTransitionException, InterruptedException {
+		try {
+			this.lock.writeLock().lock();
+			while (isTransitioning()) {
+				this.condition.await();
+			}
+			if (!isAllowedTransition(this.currentState, endState)) {
+				throw new UnallowedTransitionException(this.currentState, endState);
+			}
+			Transition transition = new Transition(endState);
+			this.currentTransition = transition;
+			return transition;
+		} finally {
+			this.lock.writeLock().unlock();
+		}
+	}
+
+	/**
+	 * Transition immediately to the given end state. This is essentially {@link #startTransition(Object)} immediately
+	 * followed by {@link Transition#close()}.
+	 *
+	 * @throws UnallowedTransitionException if the transition is not allowed.
+	 * @throws InterruptedException if the thread got interrupted while waiting for a non-transitioning state.
+	 */
+	public void transitionImmediately(T endState) throws UnallowedTransitionException, InterruptedException, FailedTransitionCallbackException {
+		Transition transition = startTransition(endState);
+		transition.close();
+	}
+
+	/**
+	 * Transition immediately to the given end state if the transition is allowed.
+	 *
+	 * @return true if the transition happened.
+	 * @throws InterruptedException if the thread got interrupted while waiting for a non-transitioning state.
+	 */
+	public boolean transitionIfAllowed(T endState) throws InterruptedException, FailedTransitionCallbackException {
+		try {
+			transitionImmediately(endState);
+		} catch (UnallowedTransitionException exc) {
+			return false;
+		}
+		return true;
+	}
+
+	/**
+	 * Get the current state. This method will wait until the FSM is in a non-transitioning state (although a transition
+	 * may start immediately after).
+	 * @throws InterruptedException if the thread got interrupted while waiting for a non-transitioning state.
+	 */
+	public T getCurrentState() throws InterruptedException {
+		try {
+		  // Need to get lock to make sure we're not in transitioning state.
+			this.lock.readLock().lock();
+
+			waitForNonTransitioningReadLock();
+
+			return this.currentState;
+		} finally {
+			this.lock.readLock().unlock();
+		}
+	}
+
+	@VisibleForTesting
+	T getCurrentStateEvenIfTransitioning() {
+		return this.currentState;
+	}
+
+	/**
+	 * @return A clone of this FSM starting at the initial state of the FSM.
+	 */
+	public FiniteStateMachine<T> cloneAtInitialState() {
+		return new FiniteStateMachine<>(this.allowedTransitions, this.universalEnds, this.errorState, this.initialState);
+	}
+
+	/**
+	 * @return A clone of this FSM starting at the current state of the FSM.
+	 */
+	public FiniteStateMachine<T> cloneAtCurrentState() throws InterruptedException {
+		try {
+			this.lock.readLock().lock();
+
+			waitForNonTransitioningReadLock();
+
+			return new FiniteStateMachine<>(this.allowedTransitions, this.universalEnds, this.errorState, this.currentState);
+		} finally {
+			this.lock.readLock().unlock();
+		}
+	}
+
+	/**
+	 * Waits for a read lock in a non-transitioning state. The caller MUST hold the read lock before calling this method.
+	 * @throws InterruptedException
+	 */
+	private void waitForNonTransitioningReadLock() throws InterruptedException {
+		if (isTransitioning()) {
+			this.lock.readLock().unlock();
+			// To use the condition, need to upgrade to a write lock
+			this.lock.writeLock().lock();
+			try {
+				while (isTransitioning()) {
+					this.condition.await();
+				}
+				// After non-transitioning state, downgrade again to read-lock
+				this.lock.readLock().lock();
+			} finally {
+				this.lock.writeLock().unlock();
+			}
+		}
+	}
+
+	private boolean isTransitioning() {
+		if (this.currentTransition != null && Thread.currentThread().equals(this.currentTransition.ownerThread)) {
+			throw new ReentrantStableStateWait(
+					"Tried to check for non-transitioning state from a thread that had already initiated a transition, "
+							+ "this may indicate a deadlock. To change end state use Transition.changeEndState() instead.");
+		}
+		if (this.currentTransition != null && !this.currentTransition.ownerThread.isAlive()) {
+			throw new AbandonedTransitionException(this.currentTransition.ownerThread);
+		}
+		return this.currentTransition != null;
+	}
+
+	protected boolean isAllowedTransition(T startState, T endState) {
+	  if (endState.equals(this.errorState)) {
+	    return true;
+    }
+		if (this.universalEnds.contains(endState)) {
+			return true;
+		}
+		Set<T> endStates = this.allowedTransitions.get(startState);
+		return endStates != null && endStates.contains(endState);
+	}
+
+	/**
+	 * A handle used for controlling the transition of the {@link FiniteStateMachine}. Note if this handle is lost the
+	 * {@link FiniteStateMachine} will likely go into an invalid state.
+	 */
+	public class Transition implements Closeable {
+		private final Thread ownerThread;
+		private volatile T endState;
+		private volatile boolean closed;
+
+		private Transition(T endState) {
+			this.ownerThread = Thread.currentThread();
+			this.endState = endState;
+			this.closed = false;
+		}
+
+		/**
+		 * Get the state at the beginning of this transition.
+		 */
+		public T getStartState() {
+			if (this.closed) {
+				throw new IllegalStateException("Transition already closed.");
+			}
+			return FiniteStateMachine.this.currentState;
+		}
+
+		/**
+		 * Change the end state of the transition. The new end state must be a legal transition for the state when the
+		 * {@link Transition} was created.
+		 *
+		 * @throws UnallowedTransitionException if the new end state is not an allowed transition.
+		 */
+		public synchronized void changeEndState(T endState) throws UnallowedTransitionException {
+			if (this.closed) {
+				throw new IllegalStateException("Transition already closed.");
+			}
+			if (!isAllowedTransition(FiniteStateMachine.this.currentState, endState)) {
+				throw new UnallowedTransitionException(FiniteStateMachine.this.currentState, endState);
+			}
+			this.endState = endState;
+		}
+
+    /**
+     * Change the end state of the transition to the FSM error state.
+     */
+		public synchronized void switchEndStateToErrorState() {
+		  this.endState = FiniteStateMachine.this.errorState;
+    }
+
+		/**
+		 * Close the current transition moving the {@link FiniteStateMachine} to the end state and releasing all locks.
+     *
+     * @throws FailedTransitionCallbackException when start or end state callbacks fail. Note if this exception is thrown
+     * the transition is not complete and the error must be handled to complete it.
+		 */
+		@Override
+		public void close() throws FailedTransitionCallbackException {
+			doClose(true);
+		}
+
+    /**
+     * Close the current transition moving the {@link FiniteStateMachine} to the end state and releasing all locks without
+     * calling any callbacks. This method should only be called after a {@link #close()} has failed and the failure
+     * cannot be handled.
+     */
+		public void closeWithoutCallbacks() {
+		  try {
+        doClose(false);
+      } catch (FailedTransitionCallbackException exc) {
+		    throw new IllegalStateException(String.format("Close without callbacks threw a %s. This is an error in code.",
+            FailedTransitionCallbackException.class), exc);
+      }
+    }
+
+		private synchronized void doClose(boolean withCallbacks) throws FailedTransitionCallbackException {
+      if (this.closed) {
+        return;
+      }
+
+      try {
+        FiniteStateMachine.this.lock.writeLock().lock();
+
+        try {
+          if (withCallbacks && getStartState() instanceof StateWithCallbacks) {
+            ((StateWithCallbacks<T>) getStartState()).onLeaveState(this.endState);
+          }
+        } catch (Throwable t) {
+          throw new FailedTransitionCallbackException(this, FailedCallback.START_STATE, t);
+        }
+
+        try {
+          if (withCallbacks && this.endState instanceof StateWithCallbacks) {
+            ((StateWithCallbacks) this.endState).onEnterState(getStartState());
+          }
+        } catch (Throwable t) {
+          throw new FailedTransitionCallbackException(this, FailedCallback.END_STATE, t);
+        }
+
+        this.closed = true;
+
+        FiniteStateMachine.this.currentState = this.endState;
+        FiniteStateMachine.this.currentTransition = null;
+        FiniteStateMachine.this.condition.signalAll();
+      } finally {
+        FiniteStateMachine.this.lock.writeLock().unlock();
+      }
+    }
+	}
+
+	/**
+	 * If a transition is not allowed to happen.
+	 */
+	@Getter
+	public static class UnallowedTransitionException extends Exception {
+		private final Object startState;
+		private final Object endState;
+
+		public UnallowedTransitionException(Object startState, Object endState) {
+			super(String.format("Unallowed transition: %s -> %s", startState, endState));
+			this.startState = startState;
+			this.endState = endState;
+		}
+	}
+
+	/**
+	 * Thrown when a thread that has started a transition is waiting for a non-transitioning state, which is a deadlock situation.
+	 */
+	public static class ReentrantStableStateWait extends RuntimeException {
+		public ReentrantStableStateWait(String message) {
+			super(message);
+		}
+	}
+
+	/**
+	 * Thrown when a transition was initiated by a thread that no longer exists, likely implying that the transition can
+	 * never be closed.
+	 */
+	public static class AbandonedTransitionException extends RuntimeException {
+		private final Thread startingThread;
+
+		public AbandonedTransitionException(Thread startingThread) {
+			super(String.format("Thread %s initiated a transition but became inactive before closing it.", startingThread));
+			this.startingThread = startingThread;
+		}
+	}
+
+	public enum FailedCallback {
+	  START_STATE, END_STATE
+  }
+
+  /**
+   * Thrown when the callbacks when closing a transition fail.
+   */
+  @Getter
+	public static class FailedTransitionCallbackException extends IOException {
+	  private final FiniteStateMachine.Transition transition;
+	  private final FailedCallback failedCallback;
+	  private final Throwable originalException;
+
+    public FailedTransitionCallbackException(FiniteStateMachine<?>.Transition transition, FailedCallback failedCallback,
+        Throwable originalException) {
+      super("Failed callbacks when ending transition.", originalException);
+      this.transition = transition;
+      this.failedCallback = failedCallback;
+      this.originalException = originalException;
+    }
+  }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java
new file mode 100644
index 0000000..9a77a7d
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * A state for a {@link FiniteStateMachine} which supports callbacks when entering and leaving the state.
+ * @param <T> supertype of states in the FSM.
+ */
+public interface StateWithCallbacks<T> {
+
+	/**
+	 * Called when an FSM reaches this state.
+	 * @param previousState the previous state of the machine.
+	 */
+	default void onEnterState(@Nullable T previousState) {
+		// do nothing
+	}
+
+	/**
+	 * Called when an FSM leaves this state.
+	 * @param nextState the next state of the machine.
+	 */
+	default void onLeaveState(T nextState) {
+		// do nothing
+	}
+
+}
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
new file mode 100644
index 0000000..ec50d02
--- /dev/null
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class FiniteStateMachineTest {
+
+	public enum MyStates {
+		PENDING, RUNNING, SUCCESS, ERROR
+	}
+
+	private final FiniteStateMachine<MyStates> refFsm = new FiniteStateMachine.Builder<MyStates>()
+			.addTransition(MyStates.PENDING, MyStates.RUNNING)
+			.addTransition(MyStates.RUNNING, MyStates.SUCCESS)
+			.addTransition(MyStates.PENDING, MyStates.ERROR)
+			.addTransition(MyStates.RUNNING, MyStates.ERROR).build(MyStates.PENDING);
+
+	@Test
+	public void singleThreadImmediateTransitionsTest() throws Exception {
+		FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+		fsm.transitionImmediately(MyStates.RUNNING);
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+		fsm.transitionImmediately(MyStates.SUCCESS);
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.SUCCESS);
+
+		fsm = fsm.cloneAtInitialState();
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+		fsm.transitionImmediately(MyStates.ERROR);
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+
+		fsm = fsm.cloneAtCurrentState();
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+	}
+
+	@Test
+	public void illegalTransitionsTest() throws Exception {
+		FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+		try {
+			fsm.transitionImmediately(MyStates.PENDING);
+			Assert.fail();
+		} catch (FiniteStateMachine.UnallowedTransitionException exc) {
+			// expected
+		}
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+		try {
+			fsm.transitionImmediately(MyStates.SUCCESS);
+			Assert.fail();
+		} catch (FiniteStateMachine.UnallowedTransitionException exc) {
+			// expected
+		}
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+		fsm.transitionImmediately(MyStates.RUNNING);
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+	}
+
+	@Test
+	public void slowTransitionsTest() throws Exception {
+		FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+		try (FiniteStateMachine.Transition transition = fsm.startTransition(MyStates.RUNNING)) {
+			try {
+				fsm.getCurrentState();
+				Assert.fail();
+			} catch (FiniteStateMachine.ReentrantStableStateWait exc) {
+				// Expected because the same thread that is transitioning tries to read the current state
+			}
+			try {
+				fsm.transitionImmediately(MyStates.RUNNING);
+				Assert.fail();
+			} catch (FiniteStateMachine.ReentrantStableStateWait exc) {
+				// Expected because the same thread that is transitioning tries to start another transition
+			}
+		}
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+
+		try (FiniteStateMachine<MyStates>.Transition transition = fsm.startTransition(MyStates.SUCCESS)) {
+			transition.changeEndState(MyStates.ERROR);
+		}
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+	}
+
+	@Test
+	public void callbackTest() throws Exception {
+	  NamedStateWithCallback stateA = new NamedStateWithCallback("a");
+    NamedStateWithCallback stateB = new NamedStateWithCallback("b");
+    NamedStateWithCallback stateC = new NamedStateWithCallback("c", null, s -> {
+      throw new RuntimeException("leave");
+    });
+    NamedStateWithCallback stateD = new NamedStateWithCallback("d");
+
+    FiniteStateMachine<NamedStateWithCallback> fsm = new FiniteStateMachine.Builder<NamedStateWithCallback>()
+        .addTransition(new NamedStateWithCallback("a"), new NamedStateWithCallback("b"))
+        .addTransition(new NamedStateWithCallback("b"), new NamedStateWithCallback("c"))
+        .addTransition(new NamedStateWithCallback("c"), new NamedStateWithCallback("d"))
+        .addUniversalEnd(new NamedStateWithCallback("ERROR"))
+        .build(stateA);
+
+    fsm.transitionImmediately(stateB);
+
+    Assert.assertEquals(fsm.getCurrentState(), stateB);
+    Assert.assertEquals(stateA.lastTransition, "leave:a->b");
+    stateA.lastTransition = "";
+    Assert.assertEquals(stateB.lastTransition, "enter:a->b");
+    stateB.lastTransition = "";
+
+    try {
+      // State that will error on enter
+      fsm.transitionImmediately(new NamedStateWithCallback("c", s -> {
+        throw new RuntimeException("enter");
+      }, s -> {
+        throw new RuntimeException("leave");
+      }));
+      Assert.fail("Expected excpetion");
+    } catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+      Assert.assertEquals(exc.getFailedCallback(), FiniteStateMachine.FailedCallback.END_STATE);
+      Assert.assertEquals(exc.getOriginalException().getMessage(), "enter");
+      // switch state to one that will only error on leave
+      exc.getTransition().changeEndState(stateC);
+      exc.getTransition().close();
+    }
+
+    Assert.assertEquals(fsm.getCurrentState(), stateC);
+    Assert.assertEquals(stateB.lastTransition, "leave:b->c");
+    stateB.lastTransition = "";
+    Assert.assertEquals(stateC.lastTransition, "enter:b->c");
+    stateC.lastTransition = "";
+
+    try {
+      fsm.transitionImmediately(stateD);
+      Assert.fail("Expected exception");
+    } catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+      Assert.assertEquals(exc.getFailedCallback(), FiniteStateMachine.FailedCallback.START_STATE);
+      Assert.assertEquals(exc.getOriginalException().getMessage(), "leave");
+      // switch state to one that will only error on leave
+      exc.getTransition().changeEndState(new NamedStateWithCallback("ERROR"));
+      exc.getTransition().closeWithoutCallbacks();
+    }
+
+    Assert.assertEquals(fsm.getCurrentState(), new NamedStateWithCallback("ERROR"));
+    Assert.assertEquals(stateD.lastTransition, "");
+  }
+
+	@Test(timeOut = 5000)
+	public void multiThreadTest() throws Exception {
+		FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+		Transitioner<MyStates> t1 = new Transitioner<>(fsm, MyStates.RUNNING);
+		Transitioner<MyStates> t2 = new Transitioner<>(fsm, MyStates.ERROR);
+
+		Thread t1Thread = new Thread(null, t1, "t1");
+		t1Thread.start();
+		t1.awaitState(Sets.newHashSet(TransitionState.TRANSITIONING));
+		Assert.assertEquals(t1.transitionResult, TransitionState.TRANSITIONING);
+		Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), MyStates.PENDING);
+
+		Thread t2Thread = new Thread(null, t2, "t2");
+		t2Thread.start();
+		Assert.assertEquals(t1.transitionResult, TransitionState.TRANSITIONING);
+		Assert.assertEquals(t2.transitionResult, TransitionState.STARTING);
+		Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), MyStates.PENDING);
+
+		t1Thread.interrupt();
+		t1.awaitState(Sets.newHashSet(TransitionState.COMPLETED));
+		t2.awaitState(Sets.newHashSet(TransitionState.TRANSITIONING));
+		Assert.assertEquals(t1.transitionResult, TransitionState.COMPLETED);
+		Assert.assertEquals(t2.transitionResult, TransitionState.TRANSITIONING);
+		Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), MyStates.RUNNING);
+
+		t2Thread.interrupt();
+		t2.awaitState(Sets.newHashSet(TransitionState.COMPLETED));
+		Assert.assertEquals(t1.transitionResult, TransitionState.COMPLETED);
+		Assert.assertEquals(t2.transitionResult, TransitionState.COMPLETED);
+		Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), MyStates.ERROR);
+	}
+
+	@Test(timeOut = 5000)
+	public void deadThreadTest() throws Exception {
+		FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+		Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+		Thread t = new Thread(() -> {
+			try {
+				FiniteStateMachine.Transition transition = fsm.startTransition(MyStates.RUNNING);
+			} catch (FiniteStateMachine.UnallowedTransitionException | InterruptedException exc) {
+				// do nothing
+			}
+			// since we don't close the transition, it should become orphaned
+		});
+		t.start();
+
+		while (t.isAlive()) {
+			Thread.sleep(50);
+		}
+
+		try {
+			fsm.transitionImmediately(MyStates.RUNNING);
+			Assert.fail();
+		} catch (FiniteStateMachine.AbandonedTransitionException exc) {
+			// Expected
+		}
+	}
+
+	@Data
+	private class Transitioner<T> implements Runnable {
+		private final FiniteStateMachine<T> fsm;
+		private final T endState;
+
+		private final Lock lock = new ReentrantLock();
+		private final Condition condition = lock.newCondition();
+
+		private volatile boolean running = false;
+		private volatile TransitionState transitionResult = TransitionState.STARTING;
+
+		@Override
+		public void run() {
+			try(FiniteStateMachine.Transition transition = this.fsm.startTransition(this.endState)) {
+				goToState(TransitionState.TRANSITIONING);
+				try {
+					Thread.sleep(2000);
+					this.transitionResult = TransitionState.TIMEOUT;
+					return;
+				} catch (InterruptedException ie) {
+					// This is the signal to end the state transition, so do nothing
+				}
+			} catch (InterruptedException exc) {
+				goToState(TransitionState.INTERRUPTED);
+				return;
+			} catch (FiniteStateMachine.UnallowedTransitionException exc) {
+				goToState(TransitionState.UNALLOWED);
+				return;
+			} catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+				goToState(TransitionState.CALLBACK_ERROR);
+				return;
+			}
+			goToState(TransitionState.COMPLETED);
+		}
+
+		public void awaitState(Set<TransitionState> states) throws InterruptedException {
+			try {
+				this.lock.lock();
+				while (!states.contains(this.transitionResult)) {
+					this.condition.await();
+				}
+			} finally {
+				this.lock.unlock();
+			}
+		}
+
+		public void goToState(TransitionState state) {
+			try {
+				this.lock.lock();
+				this.transitionResult = state;
+				this.condition.signalAll();
+			} finally {
+				this.lock.unlock();
+			}
+		}
+	}
+
+	enum TransitionState {
+		STARTING, TRANSITIONING, COMPLETED, INTERRUPTED, UNALLOWED, TIMEOUT, CALLBACK_ERROR
+	}
+
+	@RequiredArgsConstructor
+  @EqualsAndHashCode(of = "name")
+	public static class NamedStateWithCallback implements StateWithCallbacks<NamedStateWithCallback> {
+	  @Getter
+	  private final String name;
+	  private final Function<NamedStateWithCallback, Void> enterCallback;
+	  private final Function<NamedStateWithCallback, Void> leaveCallback;
+
+	  String lastTransition = "";
+
+    public NamedStateWithCallback(String name) {
+      this(name, null, null);
+    }
+
+    private void setLastTransition(String callback, NamedStateWithCallback start, NamedStateWithCallback end) {
+      this.lastTransition = String.format("%s:%s->%s", callback, start == null ? "null" : start.name, end.name);
+    }
+
+    @Override
+    public void onEnterState(@Nullable NamedStateWithCallback previousState) {
+      if (this.enterCallback == null) {
+        setLastTransition("enter", previousState, this);
+      } else {
+        this.enterCallback.apply(previousState);
+      }
+    }
+
+    @Override
+    public void onLeaveState(NamedStateWithCallback nextState) {
+      if (this.leaveCallback == null) {
+        setLastTransition("leave", this, nextState);
+      } else {
+        this.leaveCallback.apply(nextState);
+      }
+    }
+  }
+}
diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle
index 8509cc6..e772acd 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -25,7 +25,7 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
     .register(new BuildProperty("nexusArtifactSnapshotRepository", "https://repository.apache.org/content/repositories/snapshots", "Maven repository to publish artifacts"))
     .register(new BuildProperty("avroVersion", "1.8.1", "Avro dependencies version"))
     .register(new BuildProperty("awsVersion", "1.11.8", "AWS dependencies version"))
-    .register(new BuildProperty("bytemanVersion", "2.2.1", "Byteman dependencies version"))
+    .register(new BuildProperty("bytemanVersion", "4.0.5", "Byteman dependencies version"))
     .register(new BuildProperty("confluentVersion", "2.0.1", "confluent dependencies version"))
     .register(new BuildProperty("doNotSignArtifacts", false, "Do not sight Maven artifacts"))
     .register(new BuildProperty("gobblinFlavor", "standard", "Build flavor (see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index f7ff29c..0009397 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -121,8 +121,8 @@ ext.externalDependency = [
     "bytemanBmunit": "org.jboss.byteman:byteman-bmunit:" + bytemanVersion,
     "bcpgJdk15on": "org.bouncycastle:bcpg-jdk15on:1.52",
     "bcprovJdk15on": "org.bouncycastle:bcprov-jdk15on:1.52",
-    "calciteCore": "org.apache.calcite:calcite-core:1.3.0-incubating",
-    "calciteAvatica": "org.apache.calcite:calcite-avatica:1.3.0-incubating",
+    "calciteCore": "org.apache.calcite:calcite-core:1.16.0",
+    "calciteAvatica": "org.apache.calcite:calcite-avatica:1.13.0",
     "jhyde": "org.pentaho:pentaho-aggdesigner-algorithm:5.1.5-jhyde",
     "curatorFramework": "org.apache.curator:curator-framework:2.10.0",
     "curatorRecipes": "org.apache.curator:curator-recipes:2.10.0",
diff --git a/gradle/scripts/globalDependencies.gradle b/gradle/scripts/globalDependencies.gradle
index d64db67..270a121 100644
--- a/gradle/scripts/globalDependencies.gradle
+++ b/gradle/scripts/globalDependencies.gradle
@@ -43,6 +43,7 @@ subprojects {
         // Required to add JDK's tool jar, which is required to run byteman tests.
         testCompile (files(((URLClassLoader) ToolProvider.getSystemToolClassLoader()).getURLs()))
       }
+      all*.exclude group: 'org.apache.calcite', module: 'calcite-avatica' // replaced by org.apache.calcite.avatica:avatica-core
     }
   }
 }