You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/09 18:29:36 UTC
[3/3] flink git commit: [FLINK-5982] [runtime] Refactor
AbstractInvokable and StatefulTask to accept Environment and State in the
constructor.
[FLINK-5982] [runtime] Refactor AbstractInvokable and StatefulTask to accept Environment and State in the constructor.
This is the first steo towards implementing an RAII pattern for all task runtime classes.
This closes #3633
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6033de01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6033de01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6033de01
Branch: refs/heads/master
Commit: 6033de01ae620ebc9735c552ce85ccd1687793d7
Parents: 4c3f607
Author: Tony Wei <to...@gmail.com>
Authored: Wed Mar 15 11:13:41 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 9 18:30:59 2018 +0100
----------------------------------------------------------------------
.../state/RocksDBAsyncSnapshotTest.java | 16 +-
.../jobmanager/JMXJobManagerMetricTest.java | 22 +-
.../runtime/checkpoint/CheckpointOptions.java | 5 +-
.../iterative/task/AbstractIterativeTask.java | 12 +
.../iterative/task/IterationHeadTask.java | 12 +
.../task/IterationIntermediateTask.java | 14 +
.../task/IterationSynchronizationSinkTask.java | 12 +
.../iterative/task/IterationTailTask.java | 14 +
.../jobgraph/tasks/AbstractInvokable.java | 114 +++++++--
.../runtime/jobgraph/tasks/StatefulTask.java | 88 -------
.../flink/runtime/operators/BatchTask.java | 17 +-
.../flink/runtime/operators/DataSinkTask.java | 16 +-
.../flink/runtime/operators/DataSourceTask.java | 12 +-
.../apache/flink/runtime/taskmanager/Task.java | 255 ++++++++++---------
.../checkpoint/CoordinatorShutdownTest.java | 22 +-
.../client/JobClientActorRecoveryITCase.java | 5 +
.../PartialConsumePipelinedResultTest.java | 9 +
.../jobmanager/JobManagerHARecoveryTest.java | 40 +--
.../runtime/jobmanager/JobManagerTest.java | 17 +-
.../SlotCountExceedingParallelismTest.java | 23 +-
.../ScheduleOrUpdateConsumersTest.java | 13 +-
.../runtime/operators/DataSinkTaskTest.java | 28 +-
.../runtime/operators/DataSourceTaskTest.java | 8 +-
.../InvokableClassConstructorTest.java | 67 +++++
.../operators/chaining/ChainTaskTest.java | 18 +-
.../chaining/ChainedAllReduceDriverTest.java | 4 +-
.../operators/testutils/DummyInvokable.java | 12 +
.../operators/testutils/TaskTestBase.java | 34 +--
.../BackPressureStatsTrackerITCase.java | 5 +
.../runtime/taskexecutor/TaskExecutorTest.java | 5 +
.../runtime/taskmanager/TaskAsyncCallTest.java | 27 +-
.../TaskCancelAsyncProducerConsumerITCase.java | 9 +
.../runtime/taskmanager/TaskManagerTest.java | 13 +
.../flink/runtime/taskmanager/TaskTest.java | 46 +++-
.../testtasks/BlockingNoOpInvokable.java | 5 +
.../testtasks/FailingBlockingInvokable.java | 10 +
.../flink/runtime/testtasks/NoOpInvokable.java | 5 +
.../runtime/testtasks/WaitingNoOpInvokable.java | 5 +
.../runtime/testutils/StoppableInvokable.java | 9 +-
.../runtime/util/JvmExitOnFatalErrorTest.java | 5 +
.../apache/flink/runtime/jobmanager/Tasks.scala | 43 +++-
.../streaming/runtime/io/BarrierBuffer.java | 6 +-
.../streaming/runtime/io/BarrierTracker.java | 8 +-
.../runtime/io/CheckpointBarrierHandler.java | 4 +-
.../runtime/io/StreamInputProcessor.java | 4 +-
.../runtime/io/StreamTwoInputProcessor.java | 4 +-
.../runtime/tasks/OneInputStreamTask.java | 34 +++
.../runtime/tasks/SourceStreamTask.java | 8 +
.../tasks/StoppableSourceStreamTask.java | 6 +
.../runtime/tasks/StreamIterationHead.java | 8 +
.../runtime/tasks/StreamIterationTail.java | 6 +
.../streaming/runtime/tasks/StreamTask.java | 64 +++--
.../runtime/tasks/TwoInputStreamTask.java | 14 +
.../operators/async/AsyncWaitOperatorTest.java | 27 +-
.../api/streamtask/StreamIterationHeadTest.java | 4 +-
.../io/BarrierBufferAlignmentLimitTest.java | 6 +-
.../streaming/runtime/io/BarrierBufferTest.java | 26 +-
.../runtime/io/BarrierTrackerTest.java | 11 +-
.../runtime/operators/StreamTaskTimerTest.java | 17 +-
.../TestProcessingTimeServiceTest.java | 12 +-
...kpointExceptionHandlerConfigurationTest.java | 23 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 72 +++---
.../tasks/OneInputStreamTaskTestHarness.java | 30 ++-
.../runtime/tasks/RestoreStreamTaskTest.java | 47 +---
.../SourceExternalCheckpointTriggerTest.java | 10 +-
.../tasks/SourceStreamTaskStoppingTest.java | 9 +-
.../runtime/tasks/SourceStreamTaskTest.java | 17 +-
.../StreamTaskCancellationBarrierTest.java | 22 +-
.../tasks/StreamTaskTerminationTest.java | 4 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 51 +++-
.../runtime/tasks/StreamTaskTestHarness.java | 58 ++++-
.../tasks/StreamTaskTestHarnessTest.java | 6 +-
.../tasks/TaskCheckpointingBehaviourTest.java | 6 +
.../runtime/tasks/TwoInputStreamTaskTest.java | 26 +-
.../tasks/TwoInputStreamTaskTestHarness.java | 20 +-
.../test/example/client/JobRetrievalITCase.java | 5 +
.../runtime/NetworkStackThroughputITCase.java | 13 +
.../manual/MassiveCaseClassSortingITCase.scala | 26 +-
.../JobManagerLeaderSessionIDITCase.scala | 3 +-
79 files changed, 1196 insertions(+), 617 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 71c5e77..8dc504a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -111,9 +111,9 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
@Test
public void testFullyAsyncSnapshot() throws Exception {
- final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
-
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+ OneInputStreamTask::new,
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
testHarness.configureForKeyedStream(new KeySelector<String, String>() {
@@ -179,6 +179,8 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
testHarness.invoke(mockEnv);
+ final OneInputStreamTask<String, String> task = testHarness.getTask();
+
// wait for the task to be running
for (Field field: StreamTask.class.getDeclaredFields()) {
if (field.getName().equals("isRunning")) {
@@ -213,13 +215,13 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
/**
* This tests ensures that canceling of asynchronous snapshots works as expected and does not block.
- * @throws Exception
*/
@Test
public void testCancelFullyAsyncCheckpoints() throws Exception {
- final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+ OneInputStreamTask::new,
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
testHarness.configureForKeyedStream(new KeySelector<String, String>() {
@@ -278,6 +280,8 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
testHarness.invoke(mockEnv);
+ final OneInputStreamTask<String, String> task = testHarness.getTask();
+
// wait for the task to be running
for (Field field: StreamTask.class.getDeclaredFields()) {
if (field.getName().equals("isRunning")) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index c74db28..c79e3d7 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.jmx.JMXReporter;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -120,24 +122,20 @@ public class JMXJobManagerMetricTest {
* Utility to block/unblock a task.
*/
public static class BlockingInvokable extends AbstractInvokable {
- private static boolean blocking = true;
- private static final Object lock = new Object();
+
+ private static final OneShotLatch LATCH = new OneShotLatch();
+
+ public BlockingInvokable(Environment environment) {
+ super(environment);
+ }
@Override
public void invoke() throws Exception {
- while (blocking) {
- synchronized (lock) {
- lock.wait();
- }
- }
+ LATCH.await();
}
public static void unblock() {
- blocking = false;
-
- synchronized (lock) {
- lock.notifyAll();
- }
+ LATCH.trigger();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index 27e5d0c..813c292 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -18,19 +18,20 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
import java.io.Serializable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
/**
* Options for performing the checkpoint.
*
* <p>The {@link CheckpointProperties} are related and cover properties that
* are only relevant at the {@link CheckpointCoordinator}. These options are
- * relevant at the {@link StatefulTask} instances running on task managers.
+ * relevant at the {@link AbstractInvokable} instances running on task managers.
*/
public class CheckpointOptions implements Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
index a36fc57..19eb3d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -66,6 +66,7 @@ import java.util.concurrent.Future;
*/
public abstract class AbstractIterativeTask<S extends Function, OT> extends BatchTask<S, OT>
implements Terminable {
+
private static final Logger log = LoggerFactory.getLogger(AbstractIterativeTask.class);
protected LongSumAggregator worksetAggregator;
@@ -87,6 +88,17 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
private volatile boolean terminationRequested;
// --------------------------------------------------------------------------------------------
+
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public AbstractIterativeTask(Environment environment) {
+ super(environment);
+ }
+
+ // --------------------------------------------------------------------------------------------
// Main life cycle methods that implement the iterative behavior
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 2a95a65..1dd3da4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -100,6 +101,17 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
// --------------------------------------------------------------------------------------------
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public IterationHeadTask(Environment environment) {
+ super(environment);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
@Override
protected int getNumTaskInputs() {
// this task has an additional input in the workset case for the initial solution set
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
index c5fd133..d1bd5db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.iterative.task;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -49,6 +50,19 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public IterationIntermediateTask(Environment environment) {
+ super(environment);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
@Override
protected void initialize() throws Exception {
super.initialize();
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 6a38fcc..14f3612 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
@@ -73,6 +74,17 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
// --------------------------------------------------------------------------------------------
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public IterationSynchronizationSinkTask(Environment environment) {
+ super(environment);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
@Override
public void invoke() throws Exception {
this.headEventReader = new MutableRecordReader<>(
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
index 3ec3a8e..7217bfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.iterative.task;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
@@ -47,6 +48,19 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public IterationTailTask(Environment environment) {
+ super(environment);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
@Override
protected void initialize() throws Exception {
super.initialize();
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index f63a762..1734d68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -20,22 +20,53 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.BatchTask;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * This is the abstract base class for every task that can be executed by a
- * TaskManager. Concrete tasks like the vertices of batch jobs (see
- * {@link BatchTask} inherit from this class.
+ * This is the abstract base class for every task that can be executed by a TaskManager.
+ * Concrete tasks extend this class, for example the streaming and batch tasks.
*
* <p>The TaskManager invokes the {@link #invoke()} method when executing a
* task. All operations of the task happen in this method (setting up input
* output stream readers and writers as well as the task's core operation).
+ *
+ * <p>All classes that extend must offer a constructor {@code MyTask(Environment, TaskStateSnapshot)}.
+ * Tasks that are always stateless can, for convenience, also only implement the constructor
+ * {@code MyTask(Environment)}.
+ *
+ * <p><i>Developer note: While constructors cannot be enforced at compile time, we did not yet venture
+ * on the endeavor of introducing factories (it is only an internal API after all, and with Java 8,
+ * one can use {@code Class::new} almost like a factory lambda.</i>
+ *
+ * <p><b>NOTE:</b> There is no constructor that accepts and initial task state snapshot
+ * and stores it in a variable. That is on purpose, because the AbstractInvokable itself
+ * does not need the state snapshot (only subclasses such as StreamTask do need the state)
+ * and we do not want to store a reference indefinitely, thus preventing cleanup of
+ * the initial state structure by the Garbage Collector.
+ *
+ * <p>Any subclass that supports recoverable state and participates in
+ * checkpointing needs to override {@link #triggerCheckpoint(CheckpointMetaData, CheckpointOptions)},
+ * {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)},
+ * {@link #abortCheckpointOnBarrier(long, Throwable)} and {@link #notifyCheckpointComplete(long)}.
*/
public abstract class AbstractInvokable {
/** The environment assigned to this invokable. */
- private Environment environment;
+ private final Environment environment;
+
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public AbstractInvokable(Environment environment) {
+ this.environment = checkNotNull(environment);
+ }
/**
* Starts the execution.
@@ -46,7 +77,7 @@ public abstract class AbstractInvokable {
*
* <p>All resources should be cleaned up when the method returns. Make sure
* to guard the code with <code>try-finally</code> blocks where necessary.
- *
+ *
* @throws Exception
* Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.
*/
@@ -62,16 +93,6 @@ public abstract class AbstractInvokable {
public void cancel() throws Exception {
// The default implementation does nothing.
}
-
- /**
- * Sets the environment of this task.
- *
- * @param environment
- * the environment of this task
- */
- public final void setEnvironment(Environment environment) {
- this.environment = environment;
- }
/**
* Returns the environment of this task.
@@ -133,4 +154,65 @@ public abstract class AbstractInvokable {
public ExecutionConfig getExecutionConfig() {
return this.environment.getExecutionConfig();
}
+
+ // ------------------------------------------------------------------------
+ // Checkpointing Methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method is called to trigger a checkpoint, asynchronously by the checkpoint
+ * coordinator.
+ *
+ * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
+ * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
+ * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)}
+ * method.
+ *
+ * @param checkpointMetaData Meta data for about this checkpoint
+ * @param checkpointOptions Options for performing this checkpoint
+ *
+ * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
+ */
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
+ throw new UnsupportedOperationException(String.format("triggerCheckpoint not supported by %s", this.getClass().getName()));
+ }
+
+ /**
+ * This method is called when a checkpoint is triggered as a result of receiving checkpoint
+ * barriers on all input streams.
+ *
+ * @param checkpointMetaData Meta data for about this checkpoint
+ * @param checkpointOptions Options for performing this checkpoint
+ * @param checkpointMetrics Metrics about this checkpoint
+ *
+ * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
+ */
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
+ throw new UnsupportedOperationException(String.format("triggerCheckpointOnBarrier not supported by %s", this.getClass().getName()));
+ }
+
+ /**
+ * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
+ * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
+ *
+ * <p>This requires implementing tasks to forward a
+ * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
+ *
+ * @param checkpointId The ID of the checkpoint to be aborted.
+ * @param cause The reason why the checkpoint was aborted during alignment
+ */
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
+ throw new UnsupportedOperationException(String.format("abortCheckpointOnBarrier not supported by %s", this.getClass().getName()));
+ }
+
+ /**
+ * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
+ * the notification from all participating tasks.
+ *
+ * @param checkpointId The ID of the checkpoint that is complete..
+ * @throws Exception The notification method may forward its exceptions.
+ */
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ throw new UnsupportedOperationException(String.format("notifyCheckpointComplete not supported by %s", this.getClass().getName()));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
deleted file mode 100644
index 00db01f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-
-/**
- * This interface must be implemented by any invokable that has recoverable state and participates
- * in checkpointing.
- */
-public interface StatefulTask {
-
- /**
- * Sets the initial state of the operator, upon recovery. The initial state is typically
- * a snapshot of the state from a previous execution.
- *
- * @param taskStateHandles All state handle for the task.
- */
- void setInitialState(TaskStateSnapshot taskStateHandles) throws Exception;
-
- /**
- * This method is called to trigger a checkpoint, asynchronously by the checkpoint
- * coordinator.
- *
- * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
- * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
- * receiving checkpoint barriers, invoke the
- * {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)} method.
- *
- * @param checkpointMetaData Meta data for about this checkpoint
- * @param checkpointOptions Options for performing this checkpoint
- *
- * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
- */
- boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception;
-
- /**
- * This method is called when a checkpoint is triggered as a result of receiving checkpoint
- * barriers on all input streams.
- *
- * @param checkpointMetaData Meta data for about this checkpoint
- * @param checkpointOptions Options for performing this checkpoint
- * @param checkpointMetrics Metrics about this checkpoint
- *
- * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
- */
- void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception;
-
- /**
- * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
- * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
- *
- * <p>This requires implementing tasks to forward a
- * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
- *
- * @param checkpointId The ID of the checkpoint to be aborted.
- * @param cause The reason why the checkpoint was aborted during alignment
- */
- void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception;
-
- /**
- * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
- * the notification from all participating tasks.
- *
- * @param checkpointId The ID of the checkpoint that is complete..
- * @throws Exception The notification method may forward its exceptions.
- */
- void notifyCheckpointComplete(long checkpointId) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index be81877..e697869 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -79,7 +79,7 @@ import java.util.Map;
public class BatchTask<S extends Function, OT> extends AbstractInvokable implements TaskContext<S, OT> {
protected static final Logger LOG = LoggerFactory.getLogger(BatchTask.class);
-
+
// --------------------------------------------------------------------------------------------
/**
@@ -216,6 +216,19 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
private OperatorMetricGroup metrics;
// --------------------------------------------------------------------------------------------
+ // Constructor
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public BatchTask(Environment environment) {
+ super(environment);
+ }
+
+ // --------------------------------------------------------------------------------------------
// Task Interface
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index bb253ab..0ea376e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.util.MutableObjectIterator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,12 +56,12 @@ import org.slf4j.LoggerFactory;
* @see OutputFormat
*/
public class DataSinkTask<IT> extends AbstractInvokable {
-
+
// Obtain DataSinkTask Logger
private static final Logger LOG = LoggerFactory.getLogger(DataSinkTask.class);
// --------------------------------------------------------------------------------------------
-
+
// OutputFormat instance. volatile, because the asynchronous canceller may access it
private volatile OutputFormat<IT> format;
@@ -83,6 +84,15 @@ public class DataSinkTask<IT> extends AbstractInvokable {
private volatile boolean cleanupCalled;
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public DataSinkTask(Environment environment) {
+ super(environment);
+ }
+
@Override
public void invoke() throws Exception {
// --------------------------------------------------------------------
@@ -298,7 +308,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
LOG.debug(getLogString("Cancelling data sink operator"));
}
-
+
/**
* Initializes the OutputFormat implementation and configuration.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 1437877..cdfe1fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +81,15 @@ public class DataSourceTask<OT> extends AbstractInvokable {
// cancel flag
private volatile boolean taskCanceled = false;
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public DataSourceTask(Environment environment) {
+ super(environment);
+ }
+
@Override
public void invoke() throws Exception {
// --------------------------------------------------------------------
@@ -244,7 +254,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
this.taskCanceled = true;
LOG.debug(getLogString("Cancelling data source operator"));
}
-
+
/**
* Initializes the InputFormat implementation and configuration.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3c1e98e..16437d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -36,7 +36,6 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -62,13 +61,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.WrappingRuntimeException;
@@ -80,6 +79,8 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
@@ -589,9 +590,6 @@ public class Task implements Runnable, TaskActions {
taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
}
- // now load the task's invokable code
- invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
-
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
@@ -653,8 +651,7 @@ public class Task implements Runnable, TaskActions {
// call the user code initialization methods
// ----------------------------------------------------------------
- TaskKvStateRegistry kvStateRegistry = network
- .createKvStateTaskRegistry(jobId, getJobVertexId());
+ TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
Environment env = new RuntimeEnvironment(
jobId,
@@ -680,25 +677,8 @@ public class Task implements Runnable, TaskActions {
metrics,
this);
- // let the task code create its readers and writers
- invokable.setEnvironment(env);
-
- // the very last thing before the actual execution starts running is to inject
- // the state into the task. the state is non-empty if this is an execution
- // of a task that failed but had backuped state from a checkpoint
-
- if (null != taskStateHandles) {
- if (invokable instanceof StatefulTask) {
- StatefulTask op = (StatefulTask) invokable;
- op.setInitialState(taskStateHandles);
- } else {
- throw new IllegalStateException("Found operator state for a non-stateful task invokable");
- }
- // be memory and GC friendly - since the code stays in invoke() for a potentially long time,
- // we clear the reference to the state handle
- //noinspection UnusedAssignment
- taskStateHandles = null;
- }
+ // now load and instantiate the task's invokable code
+ invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env, taskStateHandles);
// ----------------------------------------------------------------
// actual task core work
@@ -893,23 +873,6 @@ public class Task implements Runnable, TaskActions {
return userCodeClassLoader;
}
- private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className) throws Exception {
- Class<? extends AbstractInvokable> invokableClass;
- try {
- invokableClass = Class.forName(className, true, classLoader)
- .asSubclass(AbstractInvokable.class);
- }
- catch (Throwable t) {
- throw new Exception("Could not load the task's invokable class.", t);
- }
- try {
- return invokableClass.newInstance();
- }
- catch (Throwable t) {
- throw new Exception("Could not instantiate the task's invokable class.", t);
- }
- }
-
private void removeCachedFiles(Map<String, Future<Path>> entries, FileCache fileCache) {
// cancel and release all distributed cache files
try {
@@ -991,8 +954,8 @@ public class Task implements Runnable, TaskActions {
@Override
public void run() {
try {
- ((StoppableTask)invokable).stop();
- } catch(RuntimeException e) {
+ ((StoppableTask) invokable).stop();
+ } catch (RuntimeException e) {
LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, e);
taskManagerActions.failTask(executionId, e);
}
@@ -1172,8 +1135,7 @@ public class Task implements Runnable, TaskActions {
// ------------------------------------------------------------------------
/**
- * Calls the invokable to trigger a checkpoint, if the invokable implements the interface
- * {@link StatefulTask}.
+ * Calls the invokable to trigger a checkpoint.
*
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
@@ -1188,52 +1150,43 @@ public class Task implements Runnable, TaskActions {
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) {
- if (invokable instanceof StatefulTask) {
- // build a local closure
- final StatefulTask statefulTask = (StatefulTask) invokable;
- final String taskName = taskNameWithSubtask;
- final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
- FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- // set safety net from the task's context for checkpointing thread
- LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
- FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
- try {
- boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
- if (!success) {
- checkpointResponder.declineCheckpoint(
- getJobID(), getExecutionId(), checkpointID,
- new CheckpointDeclineTaskNotReadyException(taskName));
- }
+ // build a local closure
+ final String taskName = taskNameWithSubtask;
+ final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
+ FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ // set safety net from the task's context for checkpointing thread
+ LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
+ FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
+
+ try {
+ boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
+ if (!success) {
+ checkpointResponder.declineCheckpoint(
+ getJobID(), getExecutionId(), checkpointID,
+ new CheckpointDeclineTaskNotReadyException(taskName));
}
- catch (Throwable t) {
- if (getExecutionState() == ExecutionState.RUNNING) {
- failExternally(new Exception(
- "Error while triggering checkpoint " + checkpointID + " for " +
- taskNameWithSubtask, t));
- } else {
- LOG.debug("Encountered error while triggering checkpoint {} for " +
- "{} ({}) while being not in state running.", checkpointID,
- taskNameWithSubtask, executionId, t);
- }
- } finally {
- FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
+ }
+ catch (Throwable t) {
+ if (getExecutionState() == ExecutionState.RUNNING) {
+ failExternally(new Exception(
+ "Error while triggering checkpoint " + checkpointID + " for " +
+ taskNameWithSubtask, t));
+ } else {
+ LOG.debug("Encountered error while triggering checkpoint {} for " +
+ "{} ({}) while being not in state running.", checkpointID,
+ taskNameWithSubtask, executionId, t);
}
+ } finally {
+ FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
}
- };
- executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
- }
- else {
- checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
- new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
-
- LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
- taskNameWithSubtask, executionId);
-
- }
+ }
+ };
+ executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
@@ -1245,37 +1198,25 @@ public class Task implements Runnable, TaskActions {
}
public void notifyCheckpointComplete(final long checkpointID) {
- AbstractInvokable invokable = this.invokable;
+ final AbstractInvokable invokable = this.invokable;
if (executionState == ExecutionState.RUNNING && invokable != null) {
- if (invokable instanceof StatefulTask) {
-
- // build a local closure
- final StatefulTask statefulTask = (StatefulTask) invokable;
- final String taskName = taskNameWithSubtask;
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- statefulTask.notifyCheckpointComplete(checkpointID);
- }
- catch (Throwable t) {
- if (getExecutionState() == ExecutionState.RUNNING) {
- // fail task if checkpoint confirmation failed.
- failExternally(new RuntimeException(
- "Error while confirming checkpoint",
- t));
- }
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ invokable.notifyCheckpointComplete(checkpointID);
+ }
+ catch (Throwable t) {
+ if (getExecutionState() == ExecutionState.RUNNING) {
+ // fail task if checkpoint confirmation failed.
+ failExternally(new RuntimeException("Error while confirming checkpoint", t));
}
}
- };
- executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName);
- }
- else {
- LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.",
- taskNameWithSubtask);
- }
+ }
+ };
+ executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskNameWithSubtask);
}
else {
LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);
@@ -1344,7 +1285,7 @@ public class Task implements Runnable, TaskActions {
/**
* Utility method to dispatch an asynchronous call on the invokable.
- *
+ *
* @param runnable The async call runnable.
* @param callName The name of the call, for logging purposes.
*/
@@ -1413,6 +1354,88 @@ public class Task implements Runnable, TaskActions {
}
/**
+ * Instantiates the given task invokable class, passing the given environment (and possibly
+ * the initial task state) to the task's constructor.
+ *
+ * <p>The method will first try to instantiate the task via a constructor accepting both
+ * the Environment and the TaskStateSnapshot. If no such constructor exists, and there is
+ * no initial state, the method will fall back to the stateless convenience constructor that
+ * accepts only the Environment.
+ *
+ * @param classLoader The classloader to load the class through.
+ * @param className The name of the class to load.
+ * @param environment The task environment.
+ * @param initialState The task's initial state. Null, if the task is either stateless, or
+ * initialized with empty state.
+ *
+ * @return The instantiated invokable task object.
+ *
+ * @throws Throwable Forwards all exceptions that happen during initialization of the task.
+ * Also throws an exception if the task class misses the necessary constructor.
+ */
+ private static AbstractInvokable loadAndInstantiateInvokable(
+ ClassLoader classLoader,
+ String className,
+ Environment environment,
+ @Nullable TaskStateSnapshot initialState) throws Throwable {
+
+ final Class<? extends AbstractInvokable> invokableClass;
+ try {
+ invokableClass = Class.forName(className, true, classLoader)
+ .asSubclass(AbstractInvokable.class);
+ }
+ catch (Throwable t) {
+ throw new Exception("Could not load the task's invokable class.", t);
+ }
+
+ Constructor<? extends AbstractInvokable> statefulCtor = null;
+ Constructor<? extends AbstractInvokable> statelessCtor = null;
+
+ // try to find and call the constructor that accepts state
+ try {
+ //noinspection JavaReflectionMemberAccess
+ statefulCtor = invokableClass.getConstructor(Environment.class, TaskStateSnapshot.class);
+ }
+ catch (NoSuchMethodException e) {
+ if (initialState == null) {
+ // we allow also the constructor that takes no state, as a convenience for stateless
+ // tasks so that they are not forced to carry the stateful constructor
+ try {
+ statelessCtor = invokableClass.getConstructor(Environment.class);
+ }
+ catch (NoSuchMethodException ee) {
+ throw new FlinkException("Task misses proper constructor", ee);
+ }
+ }
+ else {
+ throw new FlinkException("Task has state to restore, but misses the stateful constructor", e);
+ }
+ }
+
+ // instantiate the class
+ try {
+ if (statefulCtor != null) {
+ return statefulCtor.newInstance(environment, initialState);
+ }
+ else {
+ //noinspection ConstantConditions --> cannot happen
+ return statelessCtor.newInstance(environment);
+ }
+ }
+ catch (InvocationTargetException e) {
+ // directly forward exceptions from the eager initialization
+ throw e.getTargetException();
+ }
+ catch (Exception e) {
+ throw new FlinkException("Could not instantiate the task's invokable class.", e);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // TaskCanceler
+ // ------------------------------------------------------------------------
+
+ /**
* This runner calls cancel() on the invokable and periodically interrupts the
* thread until it has terminated.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 482290a..7d6c7b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -191,24 +193,20 @@ public class CoordinatorShutdownTest extends TestLogger {
}
public static class BlockingInvokable extends AbstractInvokable {
- private static boolean blocking = true;
- private static final Object lock = new Object();
+
+ private static final OneShotLatch LATCH = new OneShotLatch();
+
+ public BlockingInvokable(Environment environment) {
+ super(environment);
+ }
@Override
public void invoke() throws Exception {
- while (blocking) {
- synchronized (lock) {
- lock.wait();
- }
- }
+ LATCH.await();
}
public static void unblock() {
- blocking = false;
-
- synchronized (lock) {
- lock.notifyAll();
- }
+ LATCH.trigger();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
index 301d206..ff15d67 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
@@ -23,6 +23,7 @@ import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -142,6 +143,10 @@ public class JobClientActorRecoveryITCase extends TestLogger {
private volatile static int HasBlockedExecution = 0;
private static Object waitLock = new Object();
+ public BlockingTask(Environment environment) {
+ super(environment);
+ }
+
@Override
public void invoke() throws Exception {
if (BlockExecution > 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index f57726c..9481d57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -110,6 +111,10 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
*/
public static class SlowBufferSender extends AbstractInvokable {
+ public SlowBufferSender(Environment environment) {
+ super(environment);
+ }
+
@Override
public void invoke() throws Exception {
final ResultPartitionWriter writer = getEnvironment().getWriter(0);
@@ -128,6 +133,10 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
*/
public static class SingleBufferReceiver extends AbstractInvokable {
+ public SingleBufferReceiver(Environment environment) {
+ super(environment);
+ }
+
@Override
public void invoke() throws Exception {
InputGate gate = getEnvironment().getInputGate(0);
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 12bb95e..79f7342 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -56,7 +58,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -97,6 +98,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -120,6 +122,8 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
+import javax.annotation.Nullable;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -486,28 +490,23 @@ public class JobManagerHARecoveryTest extends TestLogger {
public static class BlockingInvokable extends AbstractInvokable {
- private static boolean blocking = true;
- private static Object lock = new Object();
+ private static final OneShotLatch LATCH = new OneShotLatch();
+
+ public BlockingInvokable(Environment environment, @Nullable TaskStateSnapshot initialState) {
+ super(environment);
+ }
@Override
public void invoke() throws Exception {
- while (blocking) {
- synchronized (lock) {
- lock.wait();
- }
- }
+ LATCH.await();
}
public static void unblock() {
- blocking = false;
-
- synchronized (lock) {
- lock.notifyAll();
- }
+ LATCH.trigger();
}
}
- public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask {
+ public static class BlockingStatefulInvokable extends BlockingInvokable {
private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
@@ -517,15 +516,18 @@ public class JobManagerHARecoveryTest extends TestLogger {
private int completedCheckpoints = 0;
- @Override
- public void setInitialState(
- TaskStateSnapshot taskStateHandles) throws Exception {
+ public BlockingStatefulInvokable(Environment environment, @Nullable TaskStateSnapshot initialState) {
+ super(environment, initialState);
+
int subtaskIndex = getIndexInSubtaskGroup();
- if (subtaskIndex < recoveredStates.length) {
- OperatorStateHandle operatorStateHandle = extractSingletonOperatorState(taskStateHandles);
+ if (initialState != null && subtaskIndex < recoveredStates.length) {
+ OperatorStateHandle operatorStateHandle = extractSingletonOperatorState(initialState);
try (FSDataInputStream in = operatorStateHandle.openInputStream()) {
recoveredStates[subtaskIndex] = InstantiationUtil.deserializeObject(in, getUserCodeClassLoader());
}
+ catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 5bc207a..ecf2ae3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManage
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -61,7 +62,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
@@ -1525,19 +1525,24 @@ public class JobManagerTest extends TestLogger {
/**
* A blocking stateful source task that declines savepoints.
*/
- public static class FailOnSavepointSourceTask extends AbstractInvokable implements StatefulTask {
+ public static class FailOnSavepointSourceTask extends AbstractInvokable {
private static final CountDownLatch CHECKPOINT_AFTER_SAVEPOINT_LATCH = new CountDownLatch(1);
private boolean receivedSavepoint;
- @Override
- public void invoke() throws Exception {
- new CountDownLatch(1).await();
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public FailOnSavepointSourceTask(Environment environment, TaskStateSnapshot initialState) {
+ super(environment);
}
@Override
- public void setInitialState(TaskStateSnapshot taskStateHandles) throws Exception {
+ public void invoke() throws Exception {
+ new CountDownLatch(1).await();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 49b11b5..4ce83cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -19,18 +19,19 @@
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.types.IntValue;
-
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -40,9 +41,9 @@ import java.util.BitSet;
public class SlotCountExceedingParallelismTest extends TestLogger {
// Test configuration
- private final static int NUMBER_OF_TMS = 2;
- private final static int NUMBER_OF_SLOTS_PER_TM = 2;
- private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+ private static final int NUMBER_OF_TMS = 2;
+ private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+ private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
private static TestingCluster flink;
@@ -120,7 +121,11 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
*/
public static class RoundRobinSubtaskIndexSender extends AbstractInvokable {
- public final static String CONFIG_KEY = "number-of-times-to-send";
+ public static final String CONFIG_KEY = "number-of-times-to-send";
+
+ public RoundRobinSubtaskIndexSender(Environment environment) {
+ super(environment);
+ }
@Override
public void invoke() throws Exception {
@@ -147,7 +152,11 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
*/
public static class SubtaskIndexReceiver extends AbstractInvokable {
- public final static String CONFIG_KEY = "number-of-indexes-to-receive";
+ public static final String CONFIG_KEY = "number-of-indexes-to-receive";
+
+ public SubtaskIndexReceiver(Environment environment) {
+ super(environment);
+ }
@Override
public void invoke() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index d861455..902c925 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmanager.scheduler;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -42,9 +43,9 @@ import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismT
public class ScheduleOrUpdateConsumersTest extends TestLogger {
- private final static int NUMBER_OF_TMS = 2;
- private final static int NUMBER_OF_SLOTS_PER_TM = 2;
- private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+ private static final int NUMBER_OF_TMS = 2;
+ private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+ private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
private static TestingCluster flink;
@@ -124,7 +125,11 @@ public class ScheduleOrUpdateConsumersTest extends TestLogger {
public static class BinaryRoundRobinSubtaskIndexSender extends AbstractInvokable {
- public final static String CONFIG_KEY = "number-of-times-to-send";
+ public static final String CONFIG_KEY = "number-of-times-to-send";
+
+ public BinaryRoundRobinSubtaskIndexSender(Environment environment) {
+ super(environment);
+ }
@Override
public void invoke() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 1fd004f..c92b2f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -77,9 +77,9 @@ public class DataSinkTaskTest extends TaskTestBase {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
- DataSinkTask<Record> testTask = new DataSinkTask<>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
- super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
+ super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString());
testTask.invoke();
@@ -139,9 +139,9 @@ public class DataSinkTaskTest extends TaskTestBase {
readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false);
readers[3] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 3, 0, false), 0, false);
- DataSinkTask<Record> testTask = new DataSinkTask<>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
- super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
+ super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString());
try {
// For the union reader to work, we need to start notifications *after* the union reader
@@ -215,7 +215,7 @@ public class DataSinkTaskTest extends TaskTestBase {
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
- DataSinkTask<Record> testTask = new DataSinkTask<>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
// set sorting
super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
@@ -225,7 +225,7 @@ public class DataSinkTaskTest extends TaskTestBase {
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
- super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
+ super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString());
try {
testTask.invoke();
@@ -293,11 +293,11 @@ public class DataSinkTaskTest extends TaskTestBase {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
- DataSinkTask<Record> testTask = new DataSinkTask<>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
- super.registerFileOutputTask(testTask, MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
+ super.registerFileOutputTask(MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
boolean stubFailed = false;
@@ -325,7 +325,7 @@ public class DataSinkTaskTest extends TaskTestBase {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
- DataSinkTask<Record> testTask = new DataSinkTask<>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
@@ -337,7 +337,7 @@ public class DataSinkTaskTest extends TaskTestBase {
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
- super.registerFileOutputTask(testTask, MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
+ super.registerFileOutputTask(MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString());
boolean stubFailed = false;
@@ -359,11 +359,11 @@ public class DataSinkTaskTest extends TaskTestBase {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new InfiniteInputIterator(), 0);
- final DataSinkTask<Record> testTask = new DataSinkTask<>();
+ final DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
- super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
+ super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString());
Thread taskRunner = new Thread() {
@Override
@@ -407,7 +407,7 @@ public class DataSinkTaskTest extends TaskTestBase {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new InfiniteInputIterator(), 0);
- final DataSinkTask<Record> testTask = new DataSinkTask<>();
+ final DataSinkTask<Record> testTask = new DataSinkTask<>(this.mockEnv);
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
@@ -419,7 +419,7 @@ public class DataSinkTaskTest extends TaskTestBase {
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
- super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
+ super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString());
Thread taskRunner = new Thread() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 82f3d1d..9a3c956 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -78,8 +78,8 @@ public class DataSourceTaskTest extends TaskTestBase {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addOutput(this.outList);
- DataSourceTask<Record> testTask = new DataSourceTask<>();
-
+ DataSourceTask<Record> testTask = new DataSourceTask<>(this.mockEnv);
+
super.registerFileInputTask(testTask, MockInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
try {
@@ -144,7 +144,7 @@ public class DataSourceTaskTest extends TaskTestBase {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addOutput(this.outList);
- DataSourceTask<Record> testTask = new DataSourceTask<>();
+ DataSourceTask<Record> testTask = new DataSourceTask<>(this.mockEnv);
super.registerFileInputTask(testTask, MockFailingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
@@ -178,7 +178,7 @@ public class DataSourceTaskTest extends TaskTestBase {
Assert.fail("Unable to set-up test input file");
}
- final DataSourceTask<Record> testTask = new DataSourceTask<>();
+ final DataSourceTask<Record> testTask = new DataSourceTask<>(this.mockEnv);
super.registerFileInputTask(testTask, MockDelayingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/InvokableClassConstructorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/InvokableClassConstructorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/InvokableClassConstructorTest.java
new file mode 100644
index 0000000..5880f05
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/InvokableClassConstructorTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.iterative.task.IterationHeadTask;
+import org.apache.flink.runtime.iterative.task.IterationIntermediateTask;
+import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
+import org.apache.flink.runtime.iterative.task.IterationTailTask;
+
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that validate that stateless/stateful task implementations have the corresponding constructors.
+ */
+public class InvokableClassConstructorTest {
+
+ private static final Class<?>[] STATELESS_TASKS = {
+ IterationHeadTask.class,
+ IterationIntermediateTask.class,
+ IterationTailTask.class,
+ IterationSynchronizationSinkTask.class,
+ DataSourceTask.class,
+ DataSinkTask.class
+ };
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testNoStatefulConstructor() throws Exception {
+ for (Class<?> clazz: STATELESS_TASKS) {
+
+ // check that there is a constructor for Environment only
+ clazz.getConstructor(Environment.class);
+
+ try {
+ // check that there is NO constructor for Environment and Task State
+ clazz.getDeclaredConstructor(Environment.class, TaskStateSnapshot.class);
+ fail("Should fail with an exception");
+ }
+ catch (NoSuchMethodException e) {
+ // expected
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 0a9efb0..6f15884 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.BatchTask;
@@ -41,6 +40,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
+
import org.junit.Assert;
import org.junit.Test;
@@ -96,9 +96,8 @@ public class ChainTaskTest extends TaskTestBase {
// chained map+combine
{
- BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
- new BatchTask<>();
- registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
+ registerTask(FlatMapDriver.class, MockMapStub.class);
+ BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv);
try {
testTask.invoke();
@@ -157,19 +156,16 @@ public class ChainTaskTest extends TaskTestBase {
// chained map+combine
{
- final BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
- new BatchTask<>();
-
- super.registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
-
+ registerTask(FlatMapDriver.class, MockMapStub.class);
+ final BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv);
+
boolean stubFailed = false;
-
try {
testTask.invoke();
} catch (Exception e) {
stubFailed = true;
}
-
+
Assert.assertTrue("Function exception was not forwarded.", stubFailed);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java
index 51ac665..a81e959 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriverTest.java
@@ -92,8 +92,8 @@ public class ChainedAllReduceDriverTest extends TaskTestBase {
// chained map+reduce
{
- BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>();
- registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
+ registerTask(FlatMapDriver.class, MockMapStub.class);
+ BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(mockEnv);
try {
testTask.invoke();
http://git-wip-us.apache.org/repos/asf/flink/blob/6033de01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java
index c47ab33..cb0faf0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyInvokable.java
@@ -20,13 +20,25 @@ package org.apache.flink.runtime.operators.testutils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import javax.annotation.Nullable;
+
/**
* An invokable that does nothing.
*/
public class DummyInvokable extends AbstractInvokable {
+ public DummyInvokable() {
+ super(new DummyEnvironment("test", 1, 0));
+ }
+
+ public DummyInvokable(Environment environment, @Nullable TaskStateSnapshot initialState) {
+ super(environment);
+ }
+
@Override
public void invoke() {}