You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/03/04 18:08:36 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-677] Allow
early termination of Gobblin jobs based on a predicate on the job progress
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9e95bd3 [GOBBLIN-677] Allow early termination of Gobblin jobs based on a predicate on the job progress
9e95bd3 is described below
commit 9e95bd32d6115175b532dbbd9f7e73bae8059c02
Author: ibuenros <is...@gmail.com>
AuthorDate: Mon Mar 4 10:08:31 2019 -0800
[GOBBLIN-677] Allow early termination of Gobblin jobs based on a predicate on the job progress
Closes #2548 from ibuenros/early-termination-2
---
.../gobblin/runtime/JobShutdownException.java | 27 ++
.../apache/gobblin/source/extractor/Extractor.java | 31 +-
.../runtime/embedded/EmbeddedGobblinDistcp.java | 2 +-
.../embedded/EmbeddedGobblinDistcpTest.java | 1 +
.../extractor/extract/kafka/KafkaExtractor.java | 14 +
gobblin-runtime/build.gradle | 5 +-
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 42 +-
.../java/org/apache/gobblin/runtime/JobState.java | 23 +-
.../main/java/org/apache/gobblin/runtime/Task.java | 8 +-
.../java/org/apache/gobblin/runtime/TaskState.java | 3 +-
.../gobblin/runtime/embedded/EmbeddedGobblin.java | 7 +-
.../runtime/job/GobblinJobFiniteStateMachine.java | 180 ++++++++
.../runtime/job/JobInterruptionPredicate.java | 111 +++++
.../apache/gobblin/runtime/job/JobProgress.java | 38 ++
.../apache/gobblin/runtime/job/TaskProgress.java | 33 ++
.../gobblin/runtime/local/LocalJobLauncher.java | 5 +
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 126 ++++--
.../spec_catalog/SpecCatalogListenersList.java | 34 --
.../gobblin/util/ReflectivePredicateEvaluator.java | 370 ++++++++++++++++
.../runtime/job/JobInterruptionPredicateTest.java | 133 ++++++
.../util/ReflectivePredicateEvaluatorTest.java | 125 ++++++
.../org/apache/gobblin/fsm/FiniteStateMachine.java | 463 +++++++++++++++++++++
.../org/apache/gobblin/fsm/StateWithCallbacks.java | 45 ++
.../apache/gobblin/fsm/FiniteStateMachineTest.java | 344 +++++++++++++++
gradle/scripts/defaultBuildProperties.gradle | 2 +-
gradle/scripts/dependencyDefinitions.gradle | 4 +-
gradle/scripts/globalDependencies.gradle | 1 +
27 files changed, 2095 insertions(+), 82 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
new file mode 100644
index 0000000..2c7d55a
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+/**
+ * An exception thrown when a job cannot be graciously shutdown.
+ */
+public class JobShutdownException extends Exception {
+ public JobShutdownException(String message) {
+ super(message);
+ }
+}
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
index 9749795..2ea900b 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
@@ -23,8 +23,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.RecordStreamWithMetadata;
+import org.apache.gobblin.runtime.JobShutdownException;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;
+import org.apache.gobblin.util.Decorator;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import io.reactivex.Emitter;
@@ -54,7 +56,7 @@ public interface Extractor<S, D> extends Closeable {
* @return schema of the extracted data records
* @throws java.io.IOException if there is problem getting the schema
*/
- public S getSchema() throws IOException;
+ S getSchema() throws IOException;
/**
* Read the next data record from the data source.
@@ -78,7 +80,7 @@ public interface Extractor<S, D> extends Closeable {
*
* @return the expected source record count
*/
- public long getExpectedRecordCount();
+ long getExpectedRecordCount();
/**
* Get the calculated high watermark up to which data records are to be extracted.
@@ -88,7 +90,23 @@ public interface Extractor<S, D> extends Closeable {
* <a href="https://github.com/linkedin/gobblin/wiki/Watermarks">Watermarks</a> for more information.
*/
@Deprecated
- public long getHighWatermark();
+ long getHighWatermark();
+
+ /**
+ * Called to notify the Extractor it should shut down as soon as possible. If this call returns successfully, the task
+ * will continue consuming records from the Extractor and continue execution normally. The extractor should only emit
+ * those records necessary to stop at a graceful committable state. Most job executors will eventually kill the task
+ * if the Extractor does not stop emitting records after a few seconds.
+ *
+ * @throws JobShutdownException if the extractor does not support early termination. This will cause the task to fail.
+ */
+ default void shutdown() throws JobShutdownException {
+ if (this instanceof Decorator && ((Decorator) this).getDecoratedObject() instanceof Extractor) {
+ ((Extractor) ((Decorator) this).getDecoratedObject()).shutdown();
+ } else {
+ throw new JobShutdownException(this.getClass().getName() + ": Extractor does not support shutdown.");
+ }
+ }
/**
* Read an {@link RecordEnvelope}. By default, just wrap {@link #readRecord(Object)} in a {@link RecordEnvelope}.
@@ -115,7 +133,12 @@ public interface Extractor<S, D> extends Closeable {
S schema = getSchema();
Flowable<StreamEntity<D>> recordStream = Flowable.generate(() -> shutdownRequest, (BiConsumer<AtomicBoolean, Emitter<StreamEntity<D>>>) (state, emitter) -> {
if (state.get()) {
- emitter.onComplete();
+ // shutdown requested
+ try {
+ shutdown();
+ } catch (JobShutdownException exc) {
+ emitter.onError(exc);
+ }
}
try {
StreamEntity<D> record = readStreamEntity();
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
index e52748f..f7076bd 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
@@ -82,7 +82,7 @@ public class EmbeddedGobblinDistcp extends EmbeddedGobblin {
this.setConfiguration(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, to.getFileSystem(new Configuration()).getUri().toString());
// add gobblin-data-management jar to distributed jars
- this.distributeJar(ClassUtil.findContainingJar(CopySource.class));
+ this.distributeJarByClassWithPriority(CopySource.class, 0);
}
/**
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 3b289c0..c12dea4 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index e54a4b6..0a4685f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -23,7 +23,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.gobblin.runtime.JobShutdownException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +114,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
private long currentPartitionReadRecordTime = 0;
protected D currentPartitionLastSuccessfulRecord = null;
+ private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
public KafkaExtractor(WorkUnitState state) {
super(state);
this.workUnitState = state;
@@ -164,6 +168,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
@SuppressWarnings("unchecked")
@Override
public D readRecordImpl(D reuse) throws DataRecordException, IOException {
+ if (this.shutdownRequested.get()) {
+ return null;
+ }
+
long readStartTime = System.nanoTime();
while (!allPartitionsFinished()) {
@@ -245,6 +253,12 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
return null;
}
+ @Override
+ public void shutdown()
+ throws JobShutdownException {
+ this.shutdownRequested.set(true);
+ }
+
private boolean allPartitionsFinished() {
return this.currentPartitionIdx != INITIAL_PARTITION_IDX && this.currentPartitionIdx >= this.highWatermark.size();
}
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 88e0c10..71dd510 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -49,6 +49,8 @@ dependencies {
compile externalDependency.avro
compile externalDependency.avroMapredH2
+ compile externalDependency.calciteCore
+ //compile externalDependency.calciteAvatica
compile externalDependency.commonsCli
compile externalDependency.commonsConfiguration
compile externalDependency.commonsEmail
@@ -83,8 +85,6 @@ dependencies {
compile externalDependency.kryo
testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
- testCompile externalDependency.calciteCore
- testCompile externalDependency.calciteAvatica
testCompile externalDependency.jhyde
testCompile externalDependency.testng
testCompile externalDependency.hamcrest
@@ -150,6 +150,7 @@ jmh {
}
test {
+ systemProperty "org.jboss.byteman.verbose", "true"
workingDir rootProject.rootDir
maxParallelForks = 1
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index e800107..ccc94cb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -28,8 +28,8 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +60,7 @@ import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import javax.annotation.Nullable;
+import lombok.Setter;
/**
@@ -93,6 +94,8 @@ public class GobblinMultiTaskAttempt {
private final Optional<String> containerIdOptional;
private final Optional<StateStore<TaskState>> taskStateStoreOptional;
private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
+ @Setter
+ private Predicate<GobblinMultiTaskAttempt> interruptionPredicate = (gmta) -> false;
private List<Task> tasks;
/**
@@ -139,16 +142,39 @@ public class GobblinMultiTaskAttempt {
this.tasks = runWorkUnits(countDownLatch);
log.info("Waiting for submitted tasks of job {} to complete in container {}...", jobId,
containerIdOptional.or(""));
- while (countDownLatch.getCount() > 0) {
- log.info(String.format("%d out of %d tasks of job %s are running in container %s", countDownLatch.getCount(),
- countDownLatch.getRegisteredParties(), jobId, containerIdOptional.or("")));
- if (countDownLatch.await(10, TimeUnit.SECONDS)) {
- break;
+ try {
+ while (countDownLatch.getCount() > 0) {
+ if (this.interruptionPredicate.test(this)) {
+ log.info("Interrupting task execution due to satisfied predicate.");
+ interruptTaskExecution(countDownLatch);
+ break;
+ }
+ log.info(String.format("%d out of %d tasks of job %s are running in container %s", countDownLatch.getCount(),
+ countDownLatch.getRegisteredParties(), jobId, containerIdOptional.or("")));
+ if (countDownLatch.await(10, TimeUnit.SECONDS)) {
+ break;
+ }
}
+ } catch (InterruptedException interrupt) {
+ log.info("Job interrupted by InterrupedException.");
+ interruptTaskExecution(countDownLatch);
}
log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or(""));
}
+ private void interruptTaskExecution(CountDownLatch countDownLatch) throws InterruptedException {
+ log.info("Job interrupted. Attempting a graceful shutdown of the job.");
+ this.tasks.forEach(Task::shutdown);
+ if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
+ log.warn("Graceful shutdown of job timed out. Killing all outstanding tasks.");
+ try {
+ this.taskExecutor.shutDown();
+ } catch (Throwable t) {
+ throw new RuntimeException("Failed to shutdown task executor.", t);
+ }
+ }
+ }
+
/**
* Commit {@link #tasks} by 1. calling {@link Task#commit()} in parallel; 2. executing any additional {@link CommitStep};
* 3. persist task statestore.
@@ -468,7 +494,8 @@ public class GobblinMultiTaskAttempt {
public static GobblinMultiTaskAttempt runWorkUnits(String jobId, String containerId, JobState jobState,
List<WorkUnit> workUnits, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor,
StateStore<TaskState> taskStateStore,
- CommitPolicy multiTaskAttemptCommitPolicy, SharedResourcesBroker<GobblinScopeTypes> jobBroker)
+ CommitPolicy multiTaskAttemptCommitPolicy, SharedResourcesBroker<GobblinScopeTypes> jobBroker,
+ Predicate<GobblinMultiTaskAttempt> interruptionPredicate)
throws IOException, InterruptedException {
// dump the work unit if tracking logs are enabled
@@ -480,6 +507,7 @@ public class GobblinMultiTaskAttempt {
GobblinMultiTaskAttempt multiTaskAttempt =
new GobblinMultiTaskAttempt(workUnits.iterator(), jobId, jobState, taskStateTracker, taskExecutor,
Optional.of(containerId), Optional.of(taskStateStore), jobBroker);
+ multiTaskAttempt.setInterruptionPredicate(interruptionPredicate);
multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
return multiTaskAttempt;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index c0f9271..45d3876 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Properties;
import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.job.JobProgress;
+import org.apache.gobblin.runtime.job.TaskProgress;
import org.apache.hadoop.io.Text;
import com.codahale.metrics.Counter;
@@ -67,7 +69,7 @@ import org.apache.gobblin.util.ImmutableProperties;
*
* @author Yinan Li
*/
-public class JobState extends SourceState {
+public class JobState extends SourceState implements JobProgress {
/**
* An enumeration of possible job states, which are identical to
@@ -248,6 +250,20 @@ public class JobState extends SourceState {
}
/**
+ * Get the currently elapsed time for this job.
+ * @return
+ */
+ public long getElapsedTime() {
+ if (this.endTime > 0) {
+ return this.endTime - this.startTime;
+ }
+ if (this.startTime > 0) {
+ return System.currentTimeMillis() - this.startTime;
+ }
+ return 0;
+ }
+
+ /**
* Set job end time.
*
* @param endTime job end time
@@ -393,6 +409,11 @@ public class JobState extends SourceState {
return ImmutableList.<TaskState>builder().addAll(this.taskStates.values()).build();
}
+ @Override
+ public List<TaskState> getTaskProgress() {
+ return getTaskStates();
+ }
+
/**
* Create a {@link Map} from dataset URNs (as being specified by {@link ConfigurationKeys#DATASET_URN_KEY} to
* {@link DatasetState} objects that represent the dataset states and store {@link TaskState}s corresponding
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 049a3ff..8ddd1d4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -462,7 +462,7 @@ public class Task implements TaskIFace {
RecordEnvelope recordEnvelope;
// Extract, convert, and fork one source record at a time.
- while (!shutdownRequested() && (recordEnvelope = extractor.readRecordEnvelope()) != null) {
+ while ((recordEnvelope = extractor.readRecordEnvelope()) != null) {
onRecordExtract();
AcknowledgableWatermark ackableWatermark = new AcknowledgableWatermark(recordEnvelope.getWatermark());
if (watermarkTracker.isPresent()) {
@@ -473,6 +473,9 @@ public class Task implements TaskIFace {
ackableWatermark.incrementAck());
}
ackableWatermark.ack();
+ if (shutdownRequested()) {
+ extractor.shutdown();
+ }
}
} else {
RecordEnvelope record;
@@ -495,6 +498,9 @@ public class Task implements TaskIFace {
throw new RuntimeException(e);
}
}
+ if (shutdownRequested()) {
+ extractor.shutdown();
+ }
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
index a9d99be..50bba37 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
+import org.apache.gobblin.runtime.job.TaskProgress;
import org.apache.hadoop.io.Text;
import com.codahale.metrics.Counter;
@@ -59,7 +60,7 @@ import lombok.Getter;
*
* @author Yinan Li
*/
-public class TaskState extends WorkUnitState {
+public class TaskState extends WorkUnitState implements TaskProgress {
// Built-in metric names
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index b6cc3b5..dced320 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -200,7 +200,12 @@ public class EmbeddedGobblin {
* will appear first in the classpath. Default priority is 0.
*/
public EmbeddedGobblin distributeJarByClassWithPriority(Class<?> klazz, int priority) {
- return distributeJarWithPriority(ClassUtil.findContainingJar(klazz), priority);
+ String jar = ClassUtil.findContainingJar(klazz);
+ if (jar == null) {
+ log.warn(String.format("Could not find jar for class %s. This is normal in test runs.", klazz));
+ return this;
+ }
+ return distributeJarWithPriority(jar, priority);
}
/**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
new file mode 100644
index 0000000..d36f68c
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.io.IOException;
+
+import org.apache.gobblin.fsm.FiniteStateMachine;
+import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.runtime.JobState;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link FiniteStateMachine} implementation to track the state of a Gobblin job executor.
+ */
+@Slf4j
+public class GobblinJobFiniteStateMachine extends FiniteStateMachine<GobblinJobFiniteStateMachine.JobFSMState> {
+
+ /**
+ * Types of state the job can be in.
+ */
+ public enum StateType {
+ PREPARING, RUNNING, INTERRUPTED, CANCELLED, SUCCESS, FAILED
+ }
+
+ /**
+ * State of a job.
+ */
+ @AllArgsConstructor(access = AccessLevel.PRIVATE)
+ @EqualsAndHashCode(of = "stateType")
+ @ToString
+ @Getter
+ public static class JobFSMState {
+ private final StateType stateType;
+ }
+
+ /**
+ * A special {@link JobFSMState} that is aware of how to interrupt a running job.
+ */
+ private class RunnableState extends JobFSMState implements StateWithCallbacks<JobFSMState> {
+ private final JobInterruptionPredicate jobInterruptionPredicate;
+
+ public RunnableState() {
+ super(StateType.RUNNING);
+ if (GobblinJobFiniteStateMachine.this.interruptGracefully == null) {
+ this.jobInterruptionPredicate = null;
+ } else {
+ this.jobInterruptionPredicate = new JobInterruptionPredicate(GobblinJobFiniteStateMachine.this.jobState,
+ GobblinJobFiniteStateMachine.this::interruptRunningJob, false);
+ }
+ }
+
+ @Override
+ public void onEnterState(@Nullable JobFSMState previousState) {
+ if (this.jobInterruptionPredicate != null) {
+ this.jobInterruptionPredicate.startAsync();
+ }
+ }
+
+ @Override
+ public void onLeaveState(JobFSMState nextState) {
+ if (this.jobInterruptionPredicate != null) {
+ this.jobInterruptionPredicate.stopAsync();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ }
+
+ /**
+ * A runnable that allows for {@link IOException}s.
+ */
+ @FunctionalInterface
+ public interface RunnableWithIoException {
+ void run() throws IOException;
+ }
+
+ private final JobState jobState;
+ private final RunnableWithIoException interruptGracefully;
+ private final RunnableWithIoException killJob;
+
+ @lombok.Builder
+ private GobblinJobFiniteStateMachine(JobState jobState, RunnableWithIoException interruptGracefully,
+ RunnableWithIoException killJob) {
+ super(buildAllowedTransitions(), Sets.newHashSet(new JobFSMState(StateType.CANCELLED)), new JobFSMState(StateType.FAILED),
+ new JobFSMState(StateType.PREPARING));
+
+ if (jobState == null) {
+ throw new IllegalArgumentException("Job state is required.");
+ }
+
+ this.jobState = jobState;
+ this.interruptGracefully = interruptGracefully;
+ this.killJob = killJob;
+ }
+
+ /**
+ * Callers should use this method to obtain the {@link JobFSMState} for a particular {@link StateType}, as the
+ * {@link JobFSMState} might contain additional functionality like running other services, etc.
+ * @param stateType
+ * @return
+ */
+ public JobFSMState getEndStateForType(StateType stateType) {
+ switch (stateType) {
+ case RUNNING:
+ return new RunnableState();
+ default:
+ return new JobFSMState(stateType);
+ }
+ }
+
+ private void interruptRunningJob() {
+ log.info("Interrupting job execution.");
+ try (FiniteStateMachine<JobFSMState>.Transition transition = startTransition(getEndStateForType(StateType.INTERRUPTED))) {
+ try {
+ this.interruptGracefully.run();
+ } catch (IOException ioe) {
+ transition.changeEndState(getEndStateForType(StateType.FAILED));
+ }
+ } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+ log.error("Cannot interrupt job.", exc);
+ } catch (InterruptedException | FailedTransitionCallbackException exc) {
+ log.error("Cannot finish graceful job interruption. Killing job.", exc);
+ try {
+ this.killJob.run();
+ } catch (IOException ioe) {
+ log.error("Failed to kill job.", ioe);
+ }
+ if (exc instanceof FailedTransitionCallbackException) {
+ ((FailedTransitionCallbackException) exc).getTransition().switchEndStateToErrorState();
+ ((FailedTransitionCallbackException) exc).getTransition().closeWithoutCallbacks();
+ }
+ }
+ }
+
+ private static SetMultimap<JobFSMState, JobFSMState> buildAllowedTransitions() {
+ SetMultimap<JobFSMState, JobFSMState> transitions = HashMultimap.create();
+ transitions.put(new JobFSMState(StateType.PREPARING), new JobFSMState(StateType.RUNNING));
+ transitions.put(new JobFSMState(StateType.PREPARING), new JobFSMState(StateType.FAILED));
+ transitions.put(new JobFSMState(StateType.PREPARING), new JobFSMState(StateType.INTERRUPTED));
+ transitions.put(new JobFSMState(StateType.RUNNING), new JobFSMState(StateType.SUCCESS));
+ transitions.put(new JobFSMState(StateType.RUNNING), new JobFSMState(StateType.FAILED));
+ transitions.put(new JobFSMState(StateType.RUNNING), new JobFSMState(StateType.INTERRUPTED));
+ return transitions;
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
new file mode 100644
index 0000000..6f67657
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.util.ReflectivePredicateEvaluator;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * This class evaluates a predicate on the {@link JobProgress} of a job and calls a job interruption hook when the
+ * predicate is satisfied.
+ *
+ * It is used to preemptively stop jobs after they satisfy some completion predicate (e.g. more than 15 minutes have
+ * elapsed and at least 75% of tasks have finished).
+ */
+@Slf4j
+public class JobInterruptionPredicate extends AbstractScheduledService {
+
+ public static final String INTERRUPTION_SQL = "org.apache.gobblin.jobInterruptionPredicate.sql";
+
+ private final String sql;
+ private final ReflectivePredicateEvaluator evaluator;
+ private final JobProgress jobProgress;
+ private final Runnable jobInterruptionHook;
+
+ public JobInterruptionPredicate(JobState jobState, Runnable jobInterruptionHook, boolean autoStart) {
+ this(jobState, jobState.getProp(INTERRUPTION_SQL), jobInterruptionHook, autoStart);
+ }
+
+ protected JobInterruptionPredicate(JobProgress jobProgress, String predicate,
+ Runnable jobInterruptionHook, boolean autoStart) {
+ this.sql = predicate;
+
+ ReflectivePredicateEvaluator tmpEval = null;
+ if (this.sql != null) {
+ try {
+ tmpEval = new ReflectivePredicateEvaluator(this.sql, JobProgress.class, TaskProgress.class);
+ } catch (SQLException exc) {
+ log.warn("Job interruption predicate is invalid, will not preemptively interrupt job.", exc);
+ }
+ }
+ this.evaluator = tmpEval;
+ this.jobProgress = jobProgress;
+ this.jobInterruptionHook = jobInterruptionHook;
+
+ if (autoStart && this.sql != null) {
+ startAsync();
+ }
+ }
+
+ @Override
+ protected void runOneIteration() {
+ if (this.evaluator == null) {
+ stopAsync();
+ return;
+ }
+ switch (this.jobProgress.getState()) {
+ case PENDING:
+ return;
+ case RUNNING:
+ try {
+ List<Object> objects = Stream.concat(Stream.<Object>of(this.jobProgress), this.jobProgress.getTaskProgress().stream()).collect(
+ Collectors.toList());
+ if (this.evaluator.evaluate(objects)) {
+ log.info("Interrupting job due to satisfied job interruption predicate. Predicate: " + this.sql);
+ this.jobInterruptionHook.run();
+ stopAsync();
+ }
+ } catch (Throwable exc) {
+ log.warn("Failed to evaluate job interruption predicate. Will not preemptively interrupt job.", exc);
+ throw Throwables.propagate(exc);
+ }
+ break;
+ default:
+ log.info(String.format("Detected job finished with state %s. Stopping job interruption predicate.",
+ this.jobProgress.getState()));
+ stopAsync();
+ }
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedDelaySchedule(30, 30, TimeUnit.SECONDS);
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java
new file mode 100644
index 0000000..ad1308a
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.util.List;
+
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * Type used to retrieve the progress of a Gobblin job.
+ */
+public interface JobProgress {
+
+ String getJobId();
+ int getTaskCount();
+ int getCompletedTasks();
+ long getElapsedTime();
+ JobState.RunningState getState();
+
+ List<? extends TaskProgress> getTaskProgress();
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
new file mode 100644
index 0000000..1ec8d61
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+
+
+/**
+ * Interface used to retrieve the progress of a task in a Gobblin job.
+ */
+public interface TaskProgress {
+
+ String getJobId();
+ String getTaskId();
+ WorkUnitState.WorkingState getWorkingState();
+ boolean isCompleted();
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
index fc0e197..6b6bc16 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
@@ -24,6 +24,7 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.runtime.job.JobInterruptionPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,8 +144,12 @@ public class LocalJobLauncher extends AbstractJobLauncher {
}
});
+ Thread thisThread = Thread.currentThread();
+ JobInterruptionPredicate jobInterruptionPredicate =
+ new JobInterruptionPredicate(jobState, () -> thisThread.interrupt(), true);
GobblinMultiTaskAttempt.runWorkUnits(this.jobContext, workUnitsWithJobState, this.taskStateTracker,
this.taskExecutor, GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+ jobInterruptionPredicate.stopAsync();
if (this.cancellationRequested) {
// Wait for the cancellation execution if it has been requested
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 1f53054..529fe11 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -27,6 +27,9 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.fsm.FiniteStateMachine;
+import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -82,6 +85,8 @@ import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.JobFSMState;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.StateType;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.runtime.util.MetricGroup;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -110,6 +115,9 @@ import org.apache.gobblin.util.SerializationUtils;
*/
public class MRJobLauncher extends AbstractJobLauncher {
+ private static final String INTERRUPT_JOB_FILE_NAME = "_INTERRUPT_JOB";
+ private static final String GOBBLIN_JOB_INTERRUPT_PATH_KEY = "gobblin.jobInterruptPath";
+
private static final Logger LOG = LoggerFactory.getLogger(MRJobLauncher.class);
private static final String JOB_NAME_PREFIX = "Gobblin-";
@@ -148,6 +156,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
private final StateStore<TaskState> taskStateStore;
private final int jarFileMaximumRetry;
+ private final Path interruptPath;
+ private final GobblinJobFiniteStateMachine fsm;
public MRJobLauncher(Properties jobProps) throws Exception {
this(jobProps, null);
@@ -170,6 +180,9 @@ public class MRJobLauncher extends AbstractJobLauncher {
throws Exception {
super(jobProps, metadataTags);
+ this.fsm = GobblinJobFiniteStateMachine.builder().jobState(jobContext.getJobState())
+ .interruptGracefully(this::interruptGracefully).killJob(this::killJob).build();
+
this.conf = conf;
// Put job configuration properties into the Hadoop configuration so they are available in the mappers
JobConfigurationUtils.putPropertiesIntoConfiguration(this.jobProps, this.conf);
@@ -185,6 +198,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
this.mrJobDir = new Path(
new Path(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY), this.jobContext.getJobName()),
this.jobContext.getJobId());
+ this.interruptPath = new Path(this.mrJobDir, INTERRUPT_JOB_FILE_NAME);
if (this.fs.exists(this.mrJobDir)) {
LOG.warn("Job working directory already exists for job " + this.jobContext.getJobName());
this.fs.delete(this.mrJobDir, true);
@@ -252,26 +266,35 @@ public class MRJobLauncher extends AbstractJobLauncher {
this.taskStateCollectorService.startAsync().awaitRunning();
LOG.info("Launching Hadoop MR job " + this.job.getJobName());
- this.job.submit();
- this.hadoopJobSubmitted = true;
+ try (FiniteStateMachine<JobFSMState>.Transition t = this.fsm.startTransition(this.fsm.getEndStateForType(StateType.RUNNING))) {
+ try {
+ this.job.submit();
+ } catch (Throwable exc) {
+ t.changeEndState(this.fsm.getEndStateForType(StateType.FAILED));
+ throw exc;
+ }
+ this.hadoopJobSubmitted = true;
- // Set job tracking URL to the Hadoop job tracking URL if it is not set yet
- if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
- jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY, this.job.getTrackingURL());
+ // Set job tracking URL to the Hadoop job tracking URL if it is not set yet
+ if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
+ jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY, this.job.getTrackingURL());
+ }
+ } catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
+ LOG.error("Cannot start MR job.", unallowed);
}
- TimingEvent mrJobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
- LOG.info(String.format("Waiting for Hadoop MR job %s to complete", this.job.getJobID()));
- this.job.waitForCompletion(true);
- mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId", this.job.getJobID().toString()));
+ if (this.fsm.getCurrentState().getStateType().equals(StateType.RUNNING)) {
+ TimingEvent mrJobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
+ LOG.info(String.format("Waiting for Hadoop MR job %s to complete", this.job.getJobID()));
- if (this.cancellationRequested) {
- // Wait for the cancellation execution if it has been requested
- synchronized (this.cancellationExecution) {
- if (this.cancellationExecuted) {
- return;
- }
- }
+ this.job.waitForCompletion(true);
+ this.fsm.transitionIfAllowed(fsm.getEndStateForType(StateType.SUCCESS));
+
+ mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId", this.job.getJobID().toString()));
+ }
+
+ if (this.fsm.getCurrentState().getStateType().equals(StateType.CANCELLED)) {
+ return;
}
// Create a metrics set for this job run from the Hadoop counters.
@@ -286,18 +309,52 @@ public class MRJobLauncher extends AbstractJobLauncher {
@Override
protected void executeCancellation() {
- try {
- if (this.hadoopJobSubmitted && !this.job.isComplete()) {
- LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId());
- this.job.killJob();
- // Collect final task states.
- this.taskStateCollectorService.stopAsync().awaitTerminated();
+ try (FiniteStateMachine<JobFSMState>.Transition transition =
+ this.fsm.startTransition(this.fsm.getEndStateForType(StateType.CANCELLED))) {
+ if (transition.getStartState().getStateType().equals(StateType.RUNNING)) {
+ try {
+ killJob();
+ } catch (IOException ioe) {
+ LOG.error("Failed to kill the Hadoop MR job for job " + this.jobContext.getJobId());
+ transition.changeEndState(this.fsm.getEndStateForType(StateType.FAILED));
+ }
+ }
+ } catch (GobblinJobFiniteStateMachine.FailedTransitionCallbackException exc) {
+ exc.getTransition().switchEndStateToErrorState();
+ exc.getTransition().closeWithoutCallbacks();
+ } catch (FiniteStateMachine.UnallowedTransitionException | InterruptedException exc) {
+ LOG.error("Failed to cancel job " + this.jobContext.getJobId(), exc);
+ }
+ }
+
+ /**
+ * Attempt a gracious interruption of the running job
+ */
+ private void interruptGracefully() throws IOException {
+ LOG.info("Attempting graceful interruption of job " + this.jobContext.getJobId());
+
+ this.fs.createNewFile(this.interruptPath);
+
+ long waitTimeStart = System.currentTimeMillis();
+ while (!this.job.isComplete() && System.currentTimeMillis() < waitTimeStart + 30 * 1000) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ break;
}
- } catch (IllegalStateException ise) {
- LOG.error("The Hadoop MR job has not started for job " + this.jobContext.getJobId());
- } catch (IOException ioe) {
- LOG.error("Failed to kill the Hadoop MR job for job " + this.jobContext.getJobId());
}
+
+ if (!this.job.isComplete()) {
+ LOG.info("Interrupted job did not shut itself down after timeout. Killing job.");
+ this.job.killJob();
+ }
+ }
+
+ private void killJob() throws IOException {
+ LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId());
+ this.job.killJob();
+ // Collect final task states.
+ this.taskStateCollectorService.stopAsync().awaitTerminated();
}
/**
@@ -381,6 +438,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
Integer.parseInt(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)));
}
+ this.job.getConfiguration().set(GOBBLIN_JOB_INTERRUPT_PATH_KEY, this.interruptPath.toString());
+
mrJobSetupTimer.stop();
}
@@ -679,6 +738,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
@Override
public void run(Context context) throws IOException, InterruptedException {
this.setup(context);
+
+ Path interruptPath = new Path(context.getConfiguration().get(GOBBLIN_JOB_INTERRUPT_PATH_KEY));
+ if (this.fs.exists(interruptPath)) {
+ LOG.info(String.format("Found interrupt path %s indicating the driver has interrupted the job, aborting mapper.", interruptPath));
+ return;
+ }
+
GobblinMultiTaskAttempt gobblinMultiTaskAttempt = null;
try {
// De-serialize and collect the list of WorkUnits to run
@@ -701,7 +767,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
gobblinMultiTaskAttempt =
GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(),
this.jobState, this.workUnits, this.taskStateTracker, this.taskExecutor, this.taskStateStore,
- multiTaskAttemptCommitPolicy, jobBroker);
+ multiTaskAttemptCommitPolicy, jobBroker, (gmta) -> {
+ try {
+ return this.fs.exists(interruptPath);
+ } catch (IOException ioe) {
+ return false;
+ }
+ });
if (this.isSpeculativeEnabled) {
LOG.info("will not commit in task attempt");
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
index f2cd04b..9a798c1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
@@ -15,40 +15,6 @@
* limitations under the License.
*/
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.gobblin.runtime.spec_catalog;
import java.io.Closeable;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
new file mode 100644
index 0000000..ead087d
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ProjectableFilterableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.AbstractTable;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+
+import lombok.Data;
+
+
+/**
+ * An predicate evaluator that uses an interface to define a table schema and can evaluate SQL statements on instances of
+ * that interface. See {@link ReflectivePredicateEvaluatorTest} for examples.
+ *
+ * Note all evaluated statements must return a single row with a single boolean column.
+ *
+ * Usage:
+ * ReflectivePredicateEvaluator<MyInterface> evaluator = new ReflectivePredicateEvaluator<>(MyInterface.class, "SELECT ... FROM myInterface");
+ * evaluator.evaluate(instance1, instance2, ...); // use the statement provided in constructor
+ * -- or --
+ * evaluator.evaluate("SELECT ... FROM myInterface", instance1, instance2, ...);
+ */
+public class ReflectivePredicateEvaluator implements Closeable {
+ private static final String REFERENCE_INTERFACES = "refInterface";
+ private static final String OPERATOR_ID = "operatorId";
+ private static final Pattern FIELD_NAME_EXTRACTOR = Pattern.compile("(?:get([A-Z]))?(.+)");
+
+ private static final String MODEL_PATTERN = "{"
+ + "version: 1, defaultSchema: 'MAIN',"
+ + "schemas: ["
+ + "{name: 'MAIN', type: 'custom', factory: '%s', operand: {%s: '%s', %s: '%d'}}"
+ + "]}";
+ private static final String CONNECT_STRING_PATTERN = "jdbc:calcite:model=inline:%s";
+
+ private static final Cache<Integer, ReflectivePredicateEvaluator> REGISTRY = CacheBuilder.newBuilder().weakValues().build();
+ private static final AtomicInteger IDENTIFIER = new AtomicInteger();
+
+ private final List<Class<?>> referenceInterfaces;
+ private final int identifier;
+ private final Connection conn;
+ private final PreparedStatement stmnt;
+
+ private final String sql;
+
+ private volatile List<Object> objects;
+
+ /**
+ * @param sql The default SQL expression to run in this evaluator.
+ * @param referenceInterfaces The interface that will be used to generate the table schema.
+ * @throws SQLException
+ */
+ public ReflectivePredicateEvaluator(String sql, Class<?>... referenceInterfaces) throws SQLException {
+ this.referenceInterfaces = Lists.newArrayList(referenceInterfaces);
+ this.sql = sql;
+
+ this.identifier = IDENTIFIER.getAndIncrement();
+ REGISTRY.put(this.identifier, this);
+
+ String model = computeModel();
+ String connectString = String.format(CONNECT_STRING_PATTERN, model);
+
+ this.conn =
+ DriverManager.getConnection(connectString);
+ this.stmnt = prepareStatement(sql);
+ }
+
+ private PreparedStatement prepareStatement(String sql) throws SQLException {
+ PreparedStatement stmnt = null;
+ try {
+ stmnt = this.conn.prepareStatement(sql);
+ validateSql(stmnt, sql);
+ return stmnt;
+ } catch (Throwable t) {
+ if (stmnt != null) {
+ stmnt.close();
+ }
+ throw t;
+ }
+ }
+
+ private String computeModel() {
+ return String.format(MODEL_PATTERN, PESchemaFactory.class.getName(), REFERENCE_INTERFACES,
+ Joiner.on(",").join(this.referenceInterfaces.stream().map(Class::getName).collect(Collectors.toList())),
+ OPERATOR_ID, this.identifier);
+ }
+
+ private void validateSql(PreparedStatement stmnt, String sql) throws SQLException {
+ ResultSetMetaData metaData = stmnt.getMetaData();
+
+ if (metaData.getColumnCount() != 1 || metaData.getColumnType(1) != Types.BOOLEAN) {
+ throw new IllegalArgumentException("Statement is expected to return a single boolean column. Provided statement: " + sql);
+ }
+ }
+
+ /**
+ * Evaluate the default predicate on the list of provided objects.
+ * @throws SQLException
+ */
+ public boolean evaluate(Object... objects) throws SQLException{
+ return evaluate(Lists.newArrayList(objects), null);
+ }
+
+ /**
+ * Evaluate an ad-hoc predicate on the list of provided objects.
+ * Note {@link #evaluate(Object...)} is preferable as it only does validation of the expression once.
+ * @throws SQLException
+ */
+ public boolean evaluate(String sql, Object... objects) throws SQLException{
+ return evaluate(Lists.newArrayList(objects), sql);
+ }
+
+ /**
+ * Evaluate the default predicate on the list of provided objects.
+ * @throws SQLException
+ */
+ public boolean evaluate(List<Object> objects) throws SQLException {
+ return evaluate(objects, null);
+ }
+
+ /**
+ * Evaluate an ad-hoc predicate on the list of provided objects.
+ * Note {@link #evaluate(Object[])} is preferable as it only does validation of the expression once.
+ * @throws SQLException
+ */
+ public boolean evaluate(List<Object> objects, String sql) throws SQLException {
+ synchronized (this) {
+ String actualSql = sql == null ? this.sql : sql;
+ PreparedStatement actualStmnt = null;
+ try {
+ actualStmnt = sql == null ? this.stmnt : prepareStatement(sql);
+
+ this.objects = objects;
+ actualStmnt.execute();
+ ResultSet rs = actualStmnt.getResultSet();
+ if (!rs.next()) {
+ throw new IllegalArgumentException("Expected at least one returned row. SQL evaluated: " + actualSql);
+ }
+ boolean result = true;
+ do {
+ result &= rs.getBoolean(1);
+ } while (rs.next());
+ return result;
+ } finally {
+ if (sql != null && actualStmnt != null) {
+ actualStmnt.close();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ try {
+ if (this.stmnt != null) {
+ this.stmnt.close();
+ }
+ if (this.conn != null) {
+ this.conn.close();
+ }
+ } catch (SQLException exc) {
+ throw new IOException("Failed to close " + ReflectivePredicateEvaluator.class.getSimpleName(), exc);
+ }
+ }
+
+ /**
+ * Calcite {@link SchemaFactory} used for the evaluator.
+ * This class is public because Calcite uses reflection to instantiate it, there is no reason to use it anywhere else
+ * in Gobblin.
+ */
+ public static class PESchemaFactory implements SchemaFactory {
+
+ @Override
+ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+ try {
+ List<Class<?>> referenceInterfaces = new ArrayList<>();
+ for (String iface : Splitter.on(",").splitToList(operand.get(REFERENCE_INTERFACES).toString())) {
+ referenceInterfaces.add(Class.forName(iface));
+ }
+ int operatorIdentifier = Integer.parseInt(operand.get(OPERATOR_ID).toString());
+
+ return new AbstractSchema() {
+ @Override
+ protected Map<String, Table> getTableMap() {
+ HashMap<String, Table> map = new HashMap<>();
+ for (Class<?> iface : referenceInterfaces) {
+ map.put(iface.getSimpleName().toUpperCase(),
+ new PETable(iface, operatorIdentifier));
+ }
+ return map;
+ }
+ };
+ } catch (ReflectiveOperationException roe) {
+ throw new RuntimeException(roe);
+ }
+ }
+ }
+
+ @Data
+ private static class PETable extends AbstractTable implements ProjectableFilterableTable {
+ private final Class<?> referenceInterface;
+ private final int operatorIdentifier;
+ private volatile boolean initialized = false;
+
+ private RelDataType rowType;
+ private List<Function<Object, Object>> methodsForFields = new ArrayList<>();
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters, int[] projects) {
+ List<Object> list = REGISTRY.getIfPresent(this.operatorIdentifier).objects;
+
+ final int[] actualProjects = resolveProjects(projects);
+
+ Enumerator<Object[]> enumerator = Linq4j.enumerator(list.stream()
+ .filter(o -> referenceInterface.isAssignableFrom(o.getClass()))
+ .map(
+ m -> {
+ Object[] res = new Object[actualProjects.length];
+ for (int i = 0; i < actualProjects.length; i++) {
+ res[i] = methodsForFields.get(actualProjects[i]).apply(m);
+ }
+ return res;
+ }
+ ).collect(Collectors.toList()));
+
+ return new AbstractEnumerable<Object[]>() {
+ @Override
+ public Enumerator<Object[]> enumerator() {
+ return enumerator;
+ }
+ };
+ }
+
+ private int[] resolveProjects(int[] projects) {
+ if (projects == null) {
+ projects = new int[methodsForFields.size()];
+ for (int i = 0; i < projects.length; i++) {
+ projects[i] = i;
+ }
+ }
+ return projects;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ initialize((JavaTypeFactory) typeFactory);
+ return this.rowType;
+ }
+
+ private synchronized void initialize(JavaTypeFactory typeFactory) {
+ if (this.initialized) {
+ return;
+ }
+
+ this.methodsForFields = new ArrayList<>();
+ List<RelDataTypeField> fields = new ArrayList<>();
+
+ for (Method method : this.referenceInterface.getMethods()) {
+ if (method.getParameterCount() == 0) {
+ String fieldName = computeFieldName(method.getName());
+ if (fieldName != null) {
+ this.methodsForFields.add(extractorForMethod(method));
+ Class<?> retType = method.getReturnType();
+ if (retType.isEnum()) {
+ retType = String.class;
+ }
+ fields.add(new RelDataTypeFieldImpl(fieldName.toUpperCase(), fields.size(), typeFactory.createType(retType)));
+ }
+ }
+ }
+
+ this.rowType = new MyDataType(fields, referenceInterface);
+ this.initialized = true;
+ }
+
+ private Function<Object, Object> extractorForMethod(Method method) {
+ return o -> {
+ try {
+ Object ret = method.invoke(o);
+ return method.getReturnType().isEnum() ? ret.toString() : ret;
+ } catch (ReflectiveOperationException roe) {
+ throw new RuntimeException(roe);
+ }
+ };
+ }
+
+ }
+
+ private static class MyDataType extends RelDataTypeImpl {
+ private final String typeString;
+
+ public MyDataType(List<? extends RelDataTypeField> fieldList, Class<?> refInterface) {
+ super(fieldList);
+ this.typeString = refInterface.getName();
+ computeDigest();
+ }
+
+ @Override
+ protected void generateTypeString(StringBuilder sb, boolean withDetail) {
+ sb.append(typeString);
+ }
+ }
+
+ private static String computeFieldName(String methodName) {
+ Matcher matcher = FIELD_NAME_EXTRACTOR.matcher(methodName);
+ if (matcher.matches()) {
+ return matcher.group(1) + matcher.group(2);
+ }
+ return null;
+ }
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
new file mode 100644
index 0000000..21625a8
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.JobState;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+public class JobInterruptionPredicateTest {
+
+ @Test
+ public void testJobPredicate() {
+ SettableJobProgress jobProgress = new SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING, new ArrayList<>());
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+ JobInterruptionPredicate predicate =
+ new JobInterruptionPredicate(jobProgress, "SELECT completedTasks > 5 FROM jobProgress", () -> atomicBoolean.set(true), false);
+
+ predicate.runOneIteration();
+ Assert.assertFalse(atomicBoolean.get());
+
+ jobProgress.completedTasks = 6;
+
+ predicate.runOneIteration();
+ Assert.assertTrue(atomicBoolean.get());
+ }
+
+ @Test
+ public void testTaskPredicate() {
+ SettableTaskProgress t1 = new SettableTaskProgress("j1", "t1", WorkUnitState.WorkingState.RUNNING, false);
+ SettableTaskProgress t2 = new SettableTaskProgress("j1", "t1", WorkUnitState.WorkingState.RUNNING, false);
+
+ SettableJobProgress jobProgress = new SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING,
+ Lists.newArrayList(t1, t2));
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+ JobInterruptionPredicate predicate =
+ new JobInterruptionPredicate(jobProgress, "SELECT count(*) > 0 FROM taskProgress WHERE workingState = 'FAILED'", () -> atomicBoolean.set(true), false);
+
+ predicate.runOneIteration();
+ Assert.assertFalse(atomicBoolean.get());
+
+ t2.workingState = WorkUnitState.WorkingState.FAILED;
+
+ predicate.runOneIteration();
+ Assert.assertTrue(atomicBoolean.get());
+ }
+
+ @Test
+ public void testTaskAndJobPredicate() {
+ SettableTaskProgress t1 = new SettableTaskProgress("j1", "t1", WorkUnitState.WorkingState.RUNNING, false);
+ SettableTaskProgress t2 = new SettableTaskProgress("j1", "t1", WorkUnitState.WorkingState.RUNNING, false);
+
+ SettableJobProgress jobProgress = new SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING,
+ Lists.newArrayList(t1, t2));
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+ JobInterruptionPredicate predicate =
+ new JobInterruptionPredicate(jobProgress,
+ "SELECT EXISTS(SELECT * FROM (SELECT completedTasks > 5 AS pred FROM jobProgress UNION SELECT count(*) > 0 AS pred FROM taskProgress WHERE workingState = 'FAILED') WHERE pred)",
+ () -> atomicBoolean.set(true), false);
+
+ predicate.runOneIteration();
+ Assert.assertFalse(atomicBoolean.get());
+
+ t2.workingState = WorkUnitState.WorkingState.FAILED;
+
+ predicate.runOneIteration();
+ Assert.assertTrue(atomicBoolean.get());
+ atomicBoolean.set(false);
+
+ t2.workingState = WorkUnitState.WorkingState.RUNNING;
+
+ predicate.runOneIteration();
+ Assert.assertFalse(atomicBoolean.get());
+
+ jobProgress.completedTasks = 6;
+
+ predicate.runOneIteration();
+ Assert.assertTrue(atomicBoolean.get());
+ }
+
+ @Getter
+ @AllArgsConstructor
+ public static class SettableJobProgress implements JobProgress {
+ private final String jobId;
+ private int taskCount;
+ private int completedTasks;
+ private long elapsedTime;
+ private JobState.RunningState runningState;
+ private List<TaskProgress> taskProgress;
+
+ @Override
+ public JobState.RunningState getState() {
+ return this.runningState;
+ }
+ }
+
+ @Getter
+ @AllArgsConstructor
+ public static class SettableTaskProgress implements TaskProgress {
+ private final String jobId;
+ private final String taskId;
+ private WorkUnitState.WorkingState workingState;
+ private boolean isCompleted;
+ }
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
new file mode 100644
index 0000000..ba7dc4b
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import lombok.Data;
+
+public class ReflectivePredicateEvaluatorTest {
+
+ @Test
+ public void simpleTest() throws Exception {
+ ReflectivePredicateEvaluator evaluator = new ReflectivePredicateEvaluator(
+ "SELECT anInt = 1 FROM myInterface", MyInterface.class);
+
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "foo")));
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, "foo")));
+
+ Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR aString = 'foo' FROM myInterface",
+ new MyImplementation(1, "bar")));
+ Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR aString = 'foo' FROM myInterface",
+ new MyImplementation(2, "foo")));
+ Assert.assertFalse(evaluator.evaluate("SELECT anInt = 1 OR aString = 'foo' FROM myInterface",
+ new MyImplementation(2, "bar")));
+ }
+
+ @Test
+ public void testWithAggregations() throws Exception {
+ ReflectivePredicateEvaluator evaluator = new ReflectivePredicateEvaluator(
+ "SELECT sum(anInt) = 5 FROM myInterface", MyInterface.class);
+
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, "foo")));
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "foo"), new MyImplementation(4, "foo")));
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, "foo"), new MyImplementation(4, "foo")));
+ }
+
+ @Test
+ public void testWithAggregationsAndFilter() throws Exception {
+ ReflectivePredicateEvaluator evaluator = new ReflectivePredicateEvaluator(
+ "SELECT sum(anInt) = 5 FROM myInterface WHERE aString = 'foo'", MyInterface.class);
+
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, "foo")));
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "bar")));
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, "foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "foo")));
+ }
+
+ @Test
+ public void testMultipleInterfaces() throws Exception {
+ ReflectivePredicateEvaluator evaluator = new ReflectivePredicateEvaluator(
+ "SELECT true = ALL (SELECT sum(anInt) = 2 AS satisfied FROM myInterface UNION SELECT sum(anInt) = 3 AS satisfied FROM myInterface2)",
+ MyInterface.class, MyInterface2.class);
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, "foo")));
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(2, "foo"), new MyImplementation2(3)));
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "foo"), new MyImplementation2(3), new MyImplementation(1, "foo")));
+ }
+
+ @Test
+ public void testMultipleOutputs() throws Exception {
+ ReflectivePredicateEvaluator evaluator =
+ new ReflectivePredicateEvaluator("SELECT anInt = 1 FROM myInterface", MyInterface.class);
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, "bar"), new MyImplementation(1, "foo")));
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, "bar"), new MyImplementation(2, "foo")));
+ }
+
+ @Test
+ public void testInvalidSQL() throws Exception {
+ try {
+ ReflectivePredicateEvaluator evaluator =
+ new ReflectivePredicateEvaluator("SELECT anInt FROM myInterface", MyInterface.class);
+ Assert.fail();
+ } catch (IllegalArgumentException exc) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testNoOutputs() throws Exception {
+ try {
+ ReflectivePredicateEvaluator evaluator =
+ new ReflectivePredicateEvaluator("SELECT anInt = 1 FROM myInterface WHERE aString = 'foo'",
+ MyInterface.class);
+ evaluator.evaluate(new MyImplementation(1, "bar"));
+ Assert.fail();
+ } catch (IllegalArgumentException exc) {
+ // Expected
+ }
+ }
+
+ private interface MyInterface {
+ int getAnInt();
+ String getAString();
+ }
+
+ @Data
+ private static class MyImplementation implements MyInterface {
+ private final int anInt;
+ private final String aString;
+ }
+
+ private interface MyInterface2 {
+ int getAnInt();
+ }
+
+ @Data
+ private static class MyImplementation2 implements MyInterface2 {
+ private final int anInt;
+ }
+
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
new file mode 100644
index 0000000..20a2bf8
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of a basic FiniteStateMachine that allows keeping track of the state its state and gating certain
+ * logic on whether a transition is valid or not.
+ *
+ * This class is useful in situations where logic is complex, possibly multi-threaded, and can take multiple paths. Certain
+ * pieces of logic (for example running a job, publishing a dataset, etc) can only happen if other actions ended correctly,
+ * and the FSM is a way of simplifying the encoding and verification of those conditions. It is understood that state
+ * transitions may not be instantaneous, and that other state transitions should not start until the current one has
+ * been resolved.
+ *
+ * All public methods of this class will wait until the FSM is in a non-transitioning state. If multiple transitions are
+ * queued at the same time, the order in which they are executed is essentially random.
+ *
+ * The states supported by FSM can be enums or instances of any base type. The legality of a transition is determined
+ * by equality, i.e. if a transition A -> B is legal, the current state is A' and the desired end state is B', the transition
+ * will be legal if A.equals(A') && B.equals(B'). This allows for storing additional information into the current state
+ * as long as it does not affect the equality check (i.e. fields that are not compared in the equals check can store
+ * state metadata, etc.).
+ *
+ * Suggested Usage:
+ * FiniteStateMachine<MySymbols> fsm = new FiniteStateMachine.Builder().addTransition(START_SYMBOL, END_SYMBOL).build(initialSymbol);
+ *
+ * try (Transition transition = fsm.startTransition(MY_END_STATE)) {
+ * try {
+ * // my logic
+ * } catch (MyException exc) {
+ * transition.changeEndState(MY_ERROR);
+ * }
+ * } catch (UnallowedTransitionException exc) {
+ * // Cannot execute logic because it's an illegal transition!
+ * } catch (ReentrantStableStateWait exc) {
+ * // Somewhere in the logic an instruction tried to do an operation with the fsm that would likely cause a deadlock
+ * } catch (AbandonedTransitionException exc) {
+ * // Another thread initiated a transition and became inactive ending the transition
+ * } catch (InterruptedException exc) {
+ * // Could not start transition because thread got interrupted while waiting for a non-transitioning state
+ * } catch (FailedTransitionCallbackException exc) {
+ * // A callback in the transition start or end states has failed.
+ * exc.getTransition().changeEndState(MY_ERROR).closeWithoutCallbacks(); // example handling
+ * }
+ *
+ * @param <T>
+ */
+@Slf4j
+public class FiniteStateMachine<T> {
+
+ /**
+ * Used to build a {@link FiniteStateMachine} instance.
+ */
+ public static class Builder<T> {
+ private final SetMultimap<T, T> allowedTransitions;
+ private final Set<T> universalEnds;
+ private T errorState;
+
+ public Builder() {
+ this.allowedTransitions = HashMultimap.create();
+ this.universalEnds = new HashSet<>();
+ }
+
+ /**
+ * Add a legal transition to the {@link FiniteStateMachine}.
+ */
+ public Builder<T> addTransition(T startState, T endState) {
+ this.allowedTransitions.put(startState, endState);
+ return this;
+ }
+
+ /**
+ * Specify that a state is a valid end state for a transition starting from any state. Useful for example for
+ * error states.
+ */
+ public Builder<T> addUniversalEnd(T state) {
+ this.universalEnds.add(state);
+ return this;
+ }
+
+ /**
+ * Specify the error state to which this machine can transition if nothing else is possible. Note the error state
+ * is always an allowed end state.
+ */
+ public Builder<T> errorState(T state) {
+ this.errorState = state;
+ return this;
+ }
+
+ /**
+ * Build a {@link FiniteStateMachine} starting at the given initial state.
+ */
+ public FiniteStateMachine<T> build(T initialState) {
+ return new FiniteStateMachine<>(this.allowedTransitions, this.universalEnds, this.errorState, initialState);
+ }
+ }
+
+ private final SetMultimap<T, T> allowedTransitions;
+ private final Set<T> universalEnds;
+ private final T errorState;
+
+ private final ReentrantReadWriteLock lock;
+ private final Condition condition;
+ private final T initialState;
+
+ private volatile T currentState;
+ private volatile Transition currentTransition;
+
+ protected FiniteStateMachine(SetMultimap<T, T> allowedTransitions, Set<T> universalEnds, T errorState, T initialState) {
+ this.allowedTransitions = allowedTransitions;
+ this.universalEnds = universalEnds;
+ this.errorState = errorState;
+
+ this.lock = new ReentrantReadWriteLock();
+ this.condition = this.lock.writeLock().newCondition();
+ this.initialState = initialState;
+ this.currentState = initialState;
+
+ if (this.currentState instanceof StateWithCallbacks) {
+ ((StateWithCallbacks) this.currentState).onEnterState(null);
+ }
+ }
+
+ /**
+ * Start a transition to the end state specified. The returned {@link Transition} object is a closeable that will finalize
+ * the transition when it is closed. While the transition is open, no other transition can start.
+ *
+ * It is recommended to call this method only within a try-with-resource block to ensure the transition is closed.
+ *
+ * @throws UnallowedTransitionException If the transition is not allowed.
+ * @throws InterruptedException if the thread got interrupted while waiting for a non-transitioning state.
+ */
+ public Transition startTransition(T endState) throws UnallowedTransitionException, InterruptedException {
+ try {
+ this.lock.writeLock().lock();
+ while (isTransitioning()) {
+ this.condition.await();
+ }
+ if (!isAllowedTransition(this.currentState, endState)) {
+ throw new UnallowedTransitionException(this.currentState, endState);
+ }
+ Transition transition = new Transition(endState);
+ this.currentTransition = transition;
+ return transition;
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Transition immediately to the given end state. This is essentially {@link #startTransition(Object)} immediately
+ * followed by {@link Transition#close()}.
+ *
+ * @throws UnallowedTransitionException if the transition is not allowed.
+ * @throws InterruptedException if the thread got interrupted while waiting for a non-transitioning state.
+ */
+ public void transitionImmediately(T endState) throws UnallowedTransitionException, InterruptedException, FailedTransitionCallbackException {
+ Transition transition = startTransition(endState);
+ transition.close();
+ }
+
+ /**
+ * Transition immediately to the given end state if the transition is allowed.
+ *
+ * @return true if the transition happened.
+ * @throws InterruptedException if the thread got interrupted while waiting for a non-transitioning state.
+ */
+ public boolean transitionIfAllowed(T endState) throws InterruptedException, FailedTransitionCallbackException {
+ try {
+ transitionImmediately(endState);
+ } catch (UnallowedTransitionException exc) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Get the current state. This method will wait until the FSM is in a non-transitioning state (although a transition
+ * may start immediately after).
+ * @throws InterruptedException if the thread got interrupted while waiting for a non-transitioning state.
+ */
+ public T getCurrentState() throws InterruptedException {
+ try {
+ // Need to get lock to make sure we're not in transitioning state.
+ this.lock.readLock().lock();
+
+ waitForNonTransitioningReadLock();
+
+ return this.currentState;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ @VisibleForTesting
+ T getCurrentStateEvenIfTransitioning() {
+ return this.currentState;
+ }
+
+ /**
+ * @return A clone of this FSM starting at the initial state of the FSM.
+ */
+ public FiniteStateMachine<T> cloneAtInitialState() {
+ return new FiniteStateMachine<>(this.allowedTransitions, this.universalEnds, this.errorState, this.initialState);
+ }
+
+ /**
+ * @return A clone of this FSM starting at the current state of the FSM.
+ */
+ public FiniteStateMachine<T> cloneAtCurrentState() throws InterruptedException {
+ try {
+ this.lock.readLock().lock();
+
+ waitForNonTransitioningReadLock();
+
+ return new FiniteStateMachine<>(this.allowedTransitions, this.universalEnds, this.errorState, this.currentState);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Waits for a read lock in a non-transitioning state. The caller MUST hold the read lock before calling this method.
+ * @throws InterruptedException
+ */
+ private void waitForNonTransitioningReadLock() throws InterruptedException {
+ if (isTransitioning()) {
+ this.lock.readLock().unlock();
+ // To use the condition, need to upgrade to a write lock
+ this.lock.writeLock().lock();
+ try {
+ while (isTransitioning()) {
+ this.condition.await();
+ }
+ // After non-transitioning state, downgrade again to read-lock
+ this.lock.readLock().lock();
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+ }
+
+ private boolean isTransitioning() {
+ if (this.currentTransition != null && Thread.currentThread().equals(this.currentTransition.ownerThread)) {
+ throw new ReentrantStableStateWait(
+ "Tried to check for non-transitioning state from a thread that had already initiated a transition, "
+ + "this may indicate a deadlock. To change end state use Transition.changeEndState() instead.");
+ }
+ if (this.currentTransition != null && !this.currentTransition.ownerThread.isAlive()) {
+ throw new AbandonedTransitionException(this.currentTransition.ownerThread);
+ }
+ return this.currentTransition != null;
+ }
+
+ protected boolean isAllowedTransition(T startState, T endState) {
+ if (endState.equals(this.errorState)) {
+ return true;
+ }
+ if (this.universalEnds.contains(endState)) {
+ return true;
+ }
+ Set<T> endStates = this.allowedTransitions.get(startState);
+ return endStates != null && endStates.contains(endState);
+ }
+
+ /**
+ * A handle used for controlling the transition of the {@link FiniteStateMachine}. Note if this handle is lost the
+ * {@link FiniteStateMachine} will likely go into an invalid state.
+ */
+ public class Transition implements Closeable {
+ private final Thread ownerThread;
+ private volatile T endState;
+ private volatile boolean closed;
+
+ private Transition(T endState) {
+ this.ownerThread = Thread.currentThread();
+ this.endState = endState;
+ this.closed = false;
+ }
+
+ /**
+ * Get the state at the beginning of this transition.
+ */
+ public T getStartState() {
+ if (this.closed) {
+ throw new IllegalStateException("Transition already closed.");
+ }
+ return FiniteStateMachine.this.currentState;
+ }
+
+ /**
+ * Change the end state of the transition. The new end state must be a legal transition for the state when the
+ * {@link Transition} was created.
+ *
+ * @throws UnallowedTransitionException if the new end state is not an allowed transition.
+ */
+ public synchronized void changeEndState(T endState) throws UnallowedTransitionException {
+ if (this.closed) {
+ throw new IllegalStateException("Transition already closed.");
+ }
+ if (!isAllowedTransition(FiniteStateMachine.this.currentState, endState)) {
+ throw new UnallowedTransitionException(FiniteStateMachine.this.currentState, endState);
+ }
+ this.endState = endState;
+ }
+
+ /**
+ * Change the end state of the transition to the FSM error state.
+ */
+ public synchronized void switchEndStateToErrorState() {
+ this.endState = FiniteStateMachine.this.errorState;
+ }
+
+ /**
+ * Close the current transition moving the {@link FiniteStateMachine} to the end state and releasing all locks.
+ *
+ * @throws FailedTransitionCallbackException when start or end state callbacks fail. Note if this exception is thrown
+ * the transition is not complete and the error must be handled to complete it.
+ */
+ @Override
+ public void close() throws FailedTransitionCallbackException {
+ doClose(true);
+ }
+
+ /**
+ * Close the current transition moving the {@link FiniteStateMachine} to the end state and releasing all locks without
+ * calling any callbacks. This method should only be called after a {@link #close()} has failed and the failure
+ * cannot be handled.
+ */
+ public void closeWithoutCallbacks() {
+ try {
+ doClose(false);
+ } catch (FailedTransitionCallbackException exc) {
+ throw new IllegalStateException(String.format("Close without callbacks threw a %s. This is an error in code.",
+ FailedTransitionCallbackException.class), exc);
+ }
+ }
+
+ private synchronized void doClose(boolean withCallbacks) throws FailedTransitionCallbackException {
+ if (this.closed) {
+ return;
+ }
+
+ try {
+ FiniteStateMachine.this.lock.writeLock().lock();
+
+ try {
+ if (withCallbacks && getStartState() instanceof StateWithCallbacks) {
+ ((StateWithCallbacks<T>) getStartState()).onLeaveState(this.endState);
+ }
+ } catch (Throwable t) {
+ throw new FailedTransitionCallbackException(this, FailedCallback.START_STATE, t);
+ }
+
+ try {
+ if (withCallbacks && this.endState instanceof StateWithCallbacks) {
+ ((StateWithCallbacks) this.endState).onEnterState(getStartState());
+ }
+ } catch (Throwable t) {
+ throw new FailedTransitionCallbackException(this, FailedCallback.END_STATE, t);
+ }
+
+ this.closed = true;
+
+ FiniteStateMachine.this.currentState = this.endState;
+ FiniteStateMachine.this.currentTransition = null;
+ FiniteStateMachine.this.condition.signalAll();
+ } finally {
+ FiniteStateMachine.this.lock.writeLock().unlock();
+ }
+ }
+ }
+
+ /**
+ * If a transition is not allowed to happen.
+ */
+ @Getter
+ public static class UnallowedTransitionException extends Exception {
+ private final Object startState;
+ private final Object endState;
+
+ public UnallowedTransitionException(Object startState, Object endState) {
+ super(String.format("Unallowed transition: %s -> %s", startState, endState));
+ this.startState = startState;
+ this.endState = endState;
+ }
+ }
+
+ /**
+ * Thrown when a thread that has started a transition is waiting for a non-transitioning state, which is a deadlock situation.
+ */
+ public static class ReentrantStableStateWait extends RuntimeException {
+ public ReentrantStableStateWait(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Thrown when a transition was initiated by a thread that no longer exists, likely implying that the transition can
+ * never be closed.
+ */
+ public static class AbandonedTransitionException extends RuntimeException {
+ private final Thread startingThread;
+
+ public AbandonedTransitionException(Thread startingThread) {
+ super(String.format("Thread %s initiated a transition but became inactive before closing it.", startingThread));
+ this.startingThread = startingThread;
+ }
+ }
+
+ public enum FailedCallback {
+ START_STATE, END_STATE
+ }
+
+ /**
+ * Thrown when the callbacks when closing a transition fail.
+ */
+ @Getter
+ public static class FailedTransitionCallbackException extends IOException {
+ private final FiniteStateMachine.Transition transition;
+ private final FailedCallback failedCallback;
+ private final Throwable originalException;
+
+ public FailedTransitionCallbackException(FiniteStateMachine<?>.Transition transition, FailedCallback failedCallback,
+ Throwable originalException) {
+ super("Failed callbacks when ending transition.", originalException);
+ this.transition = transition;
+ this.failedCallback = failedCallback;
+ this.originalException = originalException;
+ }
+ }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java
new file mode 100644
index 0000000..9a77a7d
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * A state for a {@link FiniteStateMachine} which supports callbacks when entering and leaving the state.
+ * @param <T> supertype of states in the FSM.
+ */
+public interface StateWithCallbacks<T> {
+
+ /**
+ * Called when an FSM reaches this state.
+ * @param previousState the previous state of the machine.
+ */
+ default void onEnterState(@Nullable T previousState) {
+ // do nothing
+ }
+
+ /**
+ * Called when an FSM leaves this state.
+ * @param nextState the next state of the machine.
+ */
+ default void onLeaveState(T nextState) {
+ // do nothing
+ }
+
+}
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
new file mode 100644
index 0000000..ec50d02
--- /dev/null
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class FiniteStateMachineTest {
+
+ public enum MyStates {
+ PENDING, RUNNING, SUCCESS, ERROR
+ }
+
+ private final FiniteStateMachine<MyStates> refFsm = new FiniteStateMachine.Builder<MyStates>()
+ .addTransition(MyStates.PENDING, MyStates.RUNNING)
+ .addTransition(MyStates.RUNNING, MyStates.SUCCESS)
+ .addTransition(MyStates.PENDING, MyStates.ERROR)
+ .addTransition(MyStates.RUNNING, MyStates.ERROR).build(MyStates.PENDING);
+
+ @Test
+ public void singleThreadImmediateTransitionsTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+ fsm.transitionImmediately(MyStates.RUNNING);
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+ fsm.transitionImmediately(MyStates.SUCCESS);
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.SUCCESS);
+
+ fsm = fsm.cloneAtInitialState();
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+ fsm.transitionImmediately(MyStates.ERROR);
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+
+ fsm = fsm.cloneAtCurrentState();
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+ }
+
+ @Test
+ public void illegalTransitionsTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+ try {
+ fsm.transitionImmediately(MyStates.PENDING);
+ Assert.fail();
+ } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+ // expected
+ }
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+ try {
+ fsm.transitionImmediately(MyStates.SUCCESS);
+ Assert.fail();
+ } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+ // expected
+ }
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+ fsm.transitionImmediately(MyStates.RUNNING);
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+ }
+
+ @Test
+ public void slowTransitionsTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+ try (FiniteStateMachine.Transition transition = fsm.startTransition(MyStates.RUNNING)) {
+ try {
+ fsm.getCurrentState();
+ Assert.fail();
+ } catch (FiniteStateMachine.ReentrantStableStateWait exc) {
+ // Expected because the same thread that is transitioning tries to read the current state
+ }
+ try {
+ fsm.transitionImmediately(MyStates.RUNNING);
+ Assert.fail();
+ } catch (FiniteStateMachine.ReentrantStableStateWait exc) {
+ // Expected because the same thread that is transitioning tries to start another transition
+ }
+ }
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+
+ try (FiniteStateMachine<MyStates>.Transition transition = fsm.startTransition(MyStates.SUCCESS)) {
+ transition.changeEndState(MyStates.ERROR);
+ }
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+ }
+
+ @Test
+ public void callbackTest() throws Exception {
+ NamedStateWithCallback stateA = new NamedStateWithCallback("a");
+ NamedStateWithCallback stateB = new NamedStateWithCallback("b");
+ NamedStateWithCallback stateC = new NamedStateWithCallback("c", null, s -> {
+ throw new RuntimeException("leave");
+ });
+ NamedStateWithCallback stateD = new NamedStateWithCallback("d");
+
+ FiniteStateMachine<NamedStateWithCallback> fsm = new FiniteStateMachine.Builder<NamedStateWithCallback>()
+ .addTransition(new NamedStateWithCallback("a"), new NamedStateWithCallback("b"))
+ .addTransition(new NamedStateWithCallback("b"), new NamedStateWithCallback("c"))
+ .addTransition(new NamedStateWithCallback("c"), new NamedStateWithCallback("d"))
+ .addUniversalEnd(new NamedStateWithCallback("ERROR"))
+ .build(stateA);
+
+ fsm.transitionImmediately(stateB);
+
+ Assert.assertEquals(fsm.getCurrentState(), stateB);
+ Assert.assertEquals(stateA.lastTransition, "leave:a->b");
+ stateA.lastTransition = "";
+ Assert.assertEquals(stateB.lastTransition, "enter:a->b");
+ stateB.lastTransition = "";
+
+ try {
+ // State that will error on enter
+ fsm.transitionImmediately(new NamedStateWithCallback("c", s -> {
+ throw new RuntimeException("enter");
+ }, s -> {
+ throw new RuntimeException("leave");
+ }));
+ Assert.fail("Expected excpetion");
+ } catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+ Assert.assertEquals(exc.getFailedCallback(), FiniteStateMachine.FailedCallback.END_STATE);
+ Assert.assertEquals(exc.getOriginalException().getMessage(), "enter");
+ // switch state to one that will only error on leave
+ exc.getTransition().changeEndState(stateC);
+ exc.getTransition().close();
+ }
+
+ Assert.assertEquals(fsm.getCurrentState(), stateC);
+ Assert.assertEquals(stateB.lastTransition, "leave:b->c");
+ stateB.lastTransition = "";
+ Assert.assertEquals(stateC.lastTransition, "enter:b->c");
+ stateC.lastTransition = "";
+
+ try {
+ fsm.transitionImmediately(stateD);
+ Assert.fail("Expected exception");
+ } catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+ Assert.assertEquals(exc.getFailedCallback(), FiniteStateMachine.FailedCallback.START_STATE);
+ Assert.assertEquals(exc.getOriginalException().getMessage(), "leave");
+ // switch state to one that will only error on leave
+ exc.getTransition().changeEndState(new NamedStateWithCallback("ERROR"));
+ exc.getTransition().closeWithoutCallbacks();
+ }
+
+ Assert.assertEquals(fsm.getCurrentState(), new NamedStateWithCallback("ERROR"));
+ Assert.assertEquals(stateD.lastTransition, "");
+ }
+
+ @Test(timeOut = 5000)
+ public void multiThreadTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+ Transitioner<MyStates> t1 = new Transitioner<>(fsm, MyStates.RUNNING);
+ Transitioner<MyStates> t2 = new Transitioner<>(fsm, MyStates.ERROR);
+
+ Thread t1Thread = new Thread(null, t1, "t1");
+ t1Thread.start();
+ t1.awaitState(Sets.newHashSet(TransitionState.TRANSITIONING));
+ Assert.assertEquals(t1.transitionResult, TransitionState.TRANSITIONING);
+ Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), MyStates.PENDING);
+
+ Thread t2Thread = new Thread(null, t2, "t2");
+ t2Thread.start();
+ Assert.assertEquals(t1.transitionResult, TransitionState.TRANSITIONING);
+ Assert.assertEquals(t2.transitionResult, TransitionState.STARTING);
+ Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), MyStates.PENDING);
+
+ t1Thread.interrupt();
+ t1.awaitState(Sets.newHashSet(TransitionState.COMPLETED));
+ t2.awaitState(Sets.newHashSet(TransitionState.TRANSITIONING));
+ Assert.assertEquals(t1.transitionResult, TransitionState.COMPLETED);
+ Assert.assertEquals(t2.transitionResult, TransitionState.TRANSITIONING);
+ Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), MyStates.RUNNING);
+
+ t2Thread.interrupt();
+ t2.awaitState(Sets.newHashSet(TransitionState.COMPLETED));
+ Assert.assertEquals(t1.transitionResult, TransitionState.COMPLETED);
+ Assert.assertEquals(t2.transitionResult, TransitionState.COMPLETED);
+ Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), MyStates.ERROR);
+ }
+
+ @Test(timeOut = 5000)
+ public void deadThreadTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+ Thread t = new Thread(() -> {
+ try {
+ FiniteStateMachine.Transition transition = fsm.startTransition(MyStates.RUNNING);
+ } catch (FiniteStateMachine.UnallowedTransitionException | InterruptedException exc) {
+ // do nothing
+ }
+ // since we don't close the transition, it should become orphaned
+ });
+ t.start();
+
+ while (t.isAlive()) {
+ Thread.sleep(50);
+ }
+
+ try {
+ fsm.transitionImmediately(MyStates.RUNNING);
+ Assert.fail();
+ } catch (FiniteStateMachine.AbandonedTransitionException exc) {
+ // Expected
+ }
+ }
+
+ @Data
+ private class Transitioner<T> implements Runnable {
+ private final FiniteStateMachine<T> fsm;
+ private final T endState;
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+
+ private volatile boolean running = false;
+ private volatile TransitionState transitionResult = TransitionState.STARTING;
+
+ @Override
+ public void run() {
+ try(FiniteStateMachine.Transition transition = this.fsm.startTransition(this.endState)) {
+ goToState(TransitionState.TRANSITIONING);
+ try {
+ Thread.sleep(2000);
+ this.transitionResult = TransitionState.TIMEOUT;
+ return;
+ } catch (InterruptedException ie) {
+ // This is the signal to end the state transition, so do nothing
+ }
+ } catch (InterruptedException exc) {
+ goToState(TransitionState.INTERRUPTED);
+ return;
+ } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+ goToState(TransitionState.UNALLOWED);
+ return;
+ } catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+ goToState(TransitionState.CALLBACK_ERROR);
+ return;
+ }
+ goToState(TransitionState.COMPLETED);
+ }
+
+ public void awaitState(Set<TransitionState> states) throws InterruptedException {
+ try {
+ this.lock.lock();
+ while (!states.contains(this.transitionResult)) {
+ this.condition.await();
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public void goToState(TransitionState state) {
+ try {
+ this.lock.lock();
+ this.transitionResult = state;
+ this.condition.signalAll();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+ }
+
+ enum TransitionState {
+ STARTING, TRANSITIONING, COMPLETED, INTERRUPTED, UNALLOWED, TIMEOUT, CALLBACK_ERROR
+ }
+
+ @RequiredArgsConstructor
+ @EqualsAndHashCode(of = "name")
+ public static class NamedStateWithCallback implements StateWithCallbacks<NamedStateWithCallback> {
+ @Getter
+ private final String name;
+ private final Function<NamedStateWithCallback, Void> enterCallback;
+ private final Function<NamedStateWithCallback, Void> leaveCallback;
+
+ String lastTransition = "";
+
+ public NamedStateWithCallback(String name) {
+ this(name, null, null);
+ }
+
+ private void setLastTransition(String callback, NamedStateWithCallback start, NamedStateWithCallback end) {
+ this.lastTransition = String.format("%s:%s->%s", callback, start == null ? "null" : start.name, end.name);
+ }
+
+ @Override
+ public void onEnterState(@Nullable NamedStateWithCallback previousState) {
+ if (this.enterCallback == null) {
+ setLastTransition("enter", previousState, this);
+ } else {
+ this.enterCallback.apply(previousState);
+ }
+ }
+
+ @Override
+ public void onLeaveState(NamedStateWithCallback nextState) {
+ if (this.leaveCallback == null) {
+ setLastTransition("leave", this, nextState);
+ } else {
+ this.leaveCallback.apply(nextState);
+ }
+ }
+ }
+}
diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle
index 8509cc6..e772acd 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -25,7 +25,7 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
.register(new BuildProperty("nexusArtifactSnapshotRepository", "https://repository.apache.org/content/repositories/snapshots", "Maven repository to publish artifacts"))
.register(new BuildProperty("avroVersion", "1.8.1", "Avro dependencies version"))
.register(new BuildProperty("awsVersion", "1.11.8", "AWS dependencies version"))
- .register(new BuildProperty("bytemanVersion", "2.2.1", "Byteman dependencies version"))
+ .register(new BuildProperty("bytemanVersion", "4.0.5", "Byteman dependencies version"))
.register(new BuildProperty("confluentVersion", "2.0.1", "confluent dependencies version"))
.register(new BuildProperty("doNotSignArtifacts", false, "Do not sight Maven artifacts"))
.register(new BuildProperty("gobblinFlavor", "standard", "Build flavor (see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index f7ff29c..0009397 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -121,8 +121,8 @@ ext.externalDependency = [
"bytemanBmunit": "org.jboss.byteman:byteman-bmunit:" + bytemanVersion,
"bcpgJdk15on": "org.bouncycastle:bcpg-jdk15on:1.52",
"bcprovJdk15on": "org.bouncycastle:bcprov-jdk15on:1.52",
- "calciteCore": "org.apache.calcite:calcite-core:1.3.0-incubating",
- "calciteAvatica": "org.apache.calcite:calcite-avatica:1.3.0-incubating",
+ "calciteCore": "org.apache.calcite:calcite-core:1.16.0",
+ "calciteAvatica": "org.apache.calcite:calcite-avatica:1.13.0",
"jhyde": "org.pentaho:pentaho-aggdesigner-algorithm:5.1.5-jhyde",
"curatorFramework": "org.apache.curator:curator-framework:2.10.0",
"curatorRecipes": "org.apache.curator:curator-recipes:2.10.0",
diff --git a/gradle/scripts/globalDependencies.gradle b/gradle/scripts/globalDependencies.gradle
index d64db67..270a121 100644
--- a/gradle/scripts/globalDependencies.gradle
+++ b/gradle/scripts/globalDependencies.gradle
@@ -43,6 +43,7 @@ subprojects {
// Required to add JDK's tool jar, which is required to run byteman tests.
testCompile (files(((URLClassLoader) ToolProvider.getSystemToolClassLoader()).getURLs()))
}
+ all*.exclude group: 'org.apache.calcite', module: 'calcite-avatica' // replaced by org.apache.calcite.avatica:avatica-core
}
}
}