You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/02 12:21:40 UTC
flink git commit: [FLINK-5218] [state backends] Add test that
validates that Checkpoint Streams are eagerly closed on cancellation.
Repository: flink
Updated Branches:
refs/heads/master d2607170e -> cc006ff18
[FLINK-5218] [state backends] Add test that validates that Checkpoint Streams are eagerly closed on cancellation.
This is important for some stream implementations (such as HDFS) that do not properly
handle thread interruption.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc006ff1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc006ff1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc006ff1
Branch: refs/heads/master
Commit: cc006ff18cc7032de3be3fdd9ef7ad383e88bba0
Parents: d260717
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Dec 1 17:12:12 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 2 12:37:00 2016 +0100
----------------------------------------------------------------------
.../flink/hdfstests/FileStateBackendTest.java | 9 +-
.../filesystem/FsCheckpointStreamFactory.java | 82 +++--
.../runtime/state/FileStateBackendTest.java | 5 +-
.../FsCheckpointStateOutputStreamTest.java | 32 +-
.../runtime/tasks/BlockingCheckpointsTest.java | 300 +++++++++++++++++++
5 files changed, 389 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 080485e..109d152 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -47,6 +47,7 @@ import java.util.UUID;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -94,11 +95,9 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
catch (Exception ignored) {}
}
- private URI stateBaseURI;
-
@Override
protected FsStateBackend getStateBackend() throws Exception {
- stateBaseURI = new URI(HDFS_ROOT_URI + UUID.randomUUID().toString());
+ URI stateBaseURI = new URI(HDFS_ROOT_URI + UUID.randomUUID().toString());
return new FsStateBackend(stateBaseURI);
}
@@ -191,8 +190,8 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
validateBytesInStream(handle2.openInputStream(), state2);
handle2.discardState();
- validateBytesInStream(handle3.openInputStream(), state3);
- handle3.discardState();
+ // stream 3 has zero bytes, so it should not return anything
+ assertNull(handle3);
validateBytesInStream(handle4.openInputStream(), state4);
handle4.discardState();
http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index fcc97b3..135ee04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -156,12 +156,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
private final Path basePath;
private final FileSystem fs;
-
+
private Path statePath;
-
- private boolean closed;
- private boolean isEmpty = true;
+ private volatile boolean closed;
public FsCheckpointStateOutputStream(
Path basePath, FileSystem fs,
@@ -170,7 +168,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
if (bufferSize < localStateThreshold) {
throw new IllegalArgumentException();
}
-
+
this.basePath = basePath;
this.fs = fs;
this.writeBuffer = new byte[bufferSize];
@@ -183,8 +181,6 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
flush();
}
writeBuffer[pos++] = (byte) b;
-
- isEmpty = false;
}
@Override
@@ -198,7 +194,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
off += remaining;
len -= remaining;
pos += remaining;
-
+
// flush the write buffer to make it clear again
flush();
}
@@ -213,7 +209,6 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
// write the bytes directly
outStream.write(b, off, len);
}
- isEmpty = false;
}
@Override
@@ -226,32 +221,18 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
if (!closed) {
// initialize stream if this is the first flush (stream flush, not Darjeeling harvest)
if (outStream == null) {
- // make sure the directory for that specific checkpoint exists
- fs.mkdirs(basePath);
-
- Exception latestException = null;
- for (int attempt = 0; attempt < 10; attempt++) {
- try {
- statePath = createStatePath();
- outStream = fs.create(statePath, false);
- break;
- }
- catch (Exception e) {
- latestException = e;
- }
- }
-
- if (outStream == null) {
- throw new IOException("Could not open output stream for state backend", latestException);
- }
+ createStream();
}
-
+
// now flush
if (pos > 0) {
outStream.write(writeBuffer, 0, pos);
pos = 0;
}
}
+ else {
+ throw new IOException("closed");
+ }
}
@Override
@@ -260,6 +241,14 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
}
/**
+ * Checks whether the stream is closed.
+ * @return True if the stream was closed, false if it is still open.
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+
+ /**
* If the stream is only closed, we remove the produced file (cleanup through the auto close
* feature, for example). This method throws no exception if the deletion fails, but only
* logs the error.
@@ -268,6 +257,11 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
public void close() {
if (!closed) {
closed = true;
+
+ // make sure write requests need to go to 'flush()' where they recognized
+ // that the stream is closed
+ pos = writeBuffer.length;
+
if (outStream != null) {
try {
outStream.close();
@@ -287,7 +281,8 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
@Override
public StreamStateHandle closeAndGetHandle() throws IOException {
- if (isEmpty) {
+ // check if there was nothing ever written
+ if (outStream == null && pos == 0) {
return null;
}
@@ -296,19 +291,23 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
if (outStream == null && pos <= localStateThreshold) {
closed = true;
byte[] bytes = Arrays.copyOf(writeBuffer, pos);
+ pos = writeBuffer.length;
return new ByteStreamStateHandle(createStatePath().toString(), bytes);
}
else {
flush();
+ closed = true;
+ pos = writeBuffer.length;
+
long size = -1;
// make a best effort attempt to figure out the size
try {
size = outStream.getPos();
} catch (Exception ignored) {}
-
+
outStream.close();
- closed = true;
+
return new FileStateHandle(statePath, size);
}
}
@@ -321,5 +320,26 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
private Path createStatePath() {
return new Path(basePath, UUID.randomUUID().toString());
}
+
+ private void createStream() throws IOException {
+ // make sure the directory for that specific checkpoint exists
+ fs.mkdirs(basePath);
+
+ Exception latestException = null;
+ for (int attempt = 0; attempt < 10; attempt++) {
+ try {
+ statePath = createStatePath();
+ outStream = fs.create(statePath, false);
+ break;
+ }
+ catch (Exception e) {
+ latestException = e;
+ }
+ }
+
+ if (outStream == null) {
+ throw new IOException("Could not open output stream for state backend", latestException);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 0b04ebc..57f4572 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -38,6 +38,7 @@ import java.util.Random;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -138,8 +139,8 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
validateBytesInStream(handle2.openInputStream(), state2);
handle2.discardState();
- validateBytesInStream(handle3.openInputStream(), state3);
- handle3.discardState();
+ // nothing was written to the stream, so it will return nothing
+ assertNull(handle3);
validateBytesInStream(handle4.openInputStream(), state4);
handle4.discardState();
http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index 00d8ca8..6d371b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -21,17 +21,22 @@ package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
import org.junit.Assert;
import org.junit.Test;
import java.io.DataInputStream;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class FsCheckpointStateOutputStreamTest {
@@ -57,7 +62,7 @@ public class FsCheckpointStateOutputStreamTest {
}
@Test
- public void testStateBlowMemThreshold() throws Exception {
+ public void testStateBelowMemThreshold() throws Exception {
runTest(222, 999, 512, false);
}
@@ -150,4 +155,29 @@ public class FsCheckpointStateOutputStreamTest {
handle.discardState();
}
+
+ @Test
+ public void testWriteFailsFastWhenClosed() throws Exception {
+ FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512);
+
+ assertFalse(stream.isClosed());
+
+ stream.close();
+ assertTrue(stream.isClosed());
+
+ try {
+ stream.write(1);
+ fail();
+ } catch (IOException e) {
+ // expected
+ }
+
+ try {
+ stream.write(new byte[4], 1, 2);
+ fail();
+ } catch (IOException e) {
+ // expected
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
new file mode 100644
index 0000000..6e96400
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test checks that task checkpoints that block and do not react to thread interrupts
+ * are
+ */
+public class BlockingCheckpointsTest {
+
+ private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();
+
+ @Test
+ public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+
+ Configuration taskConfig = new Configuration();
+ StreamConfig cfg = new StreamConfig(taskConfig);
+ cfg.setStreamOperator(new TestOperator());
+ cfg.setStateBackend(new LockingStreamStateBackend());
+
+ Task task = createTask(taskConfig);
+
+ // start the task and wait until it is in "restore"
+ task.startTaskThread();
+ IN_CHECKPOINT_LATCH.await();
+
+ // cancel the task and wait. unless cancellation properly closes
+ // the streams, this will never terminate
+ task.cancelExecution();
+ task.getExecutingThread().join();
+
+ assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+ assertNull(task.getFailureCause());
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static Task createTask(Configuration taskConfig) throws IOException {
+
+ JobInformation jobInformation = new JobInformation(
+ new JobID(),
+ "test job name",
+ new SerializedValue<>(new ExecutionConfig()),
+ new Configuration(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList());
+
+ TaskInformation taskInformation = new TaskInformation(
+ new JobVertexID(),
+ "test task name",
+ 1,
+ 11,
+ TestStreamTask.class.getName(),
+ taskConfig);
+
+ TaskKvStateRegistry mockKvRegistry = mock(TaskKvStateRegistry.class);
+ NetworkEnvironment network = mock(NetworkEnvironment.class);
+ when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mockKvRegistry);
+
+ return new Task(
+ jobInformation,
+ taskInformation,
+ new ExecutionAttemptID(),
+ 0,
+ 0,
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ 0,
+ null,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ network,
+ mock(BroadcastVariableManager.class),
+ mock(TaskManagerConnection.class),
+ mock(InputSplitProvider.class),
+ mock(CheckpointResponder.class),
+ new FallbackLibraryCacheManager(),
+ new FileCache(new Configuration()),
+ new TaskManagerRuntimeInfo(
+ "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+ new UnregisteredTaskMetricsGroup(),
+ mock(ResultPartitionConsumableNotifier.class),
+ mock(PartitionStateChecker.class),
+ Executors.directExecutor());
+
+ }
+
+ // ------------------------------------------------------------------------
+ // state backend with locking output stream
+ // ------------------------------------------------------------------------
+
+ private static class LockingStreamStateBackend extends AbstractStateBackend {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
+ return new LockingOutputStreamFactory();
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env, JobID jobID, String operatorIdentifier,
+ TypeSerializer<K> keySerializer, int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception {
+
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
+ Environment env, JobID jobID, String operatorIdentifier,
+ TypeSerializer<K> keySerializer, int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange, Collection<KeyGroupsStateHandle> restoredState,
+ TaskKvStateRegistry kvStateRegistry) throws Exception {
+
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static final class LockingOutputStreamFactory implements CheckpointStreamFactory {
+
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) {
+ return new LockingOutputStream();
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ private static final class LockingOutputStream extends CheckpointStateOutputStream {
+
+ private final Object lock = new Object();
+ private volatile boolean closed;
+
+ @Override
+ public StreamStateHandle closeAndGetHandle() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ // this needs to not react to interrupts until the handle is closed
+ synchronized (lock) {
+ while (!closed) {
+ try {
+ lock.wait();
+ }
+ catch (InterruptedException ignored) {}
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ closed = true;
+ lock.notifyAll();
+ }
+ }
+
+ @Override
+ public long getPos() {
+ return 0;
+ }
+
+ @Override
+ public void flush() {}
+
+ @Override
+ public void sync() {}
+ }
+
+ // ------------------------------------------------------------------------
+ // test source operator that calls into the locking checkpoint output stream
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class TestOperator extends StreamFilter<Object> {
+ private static final long serialVersionUID = 1L;
+
+ public TestOperator() {
+ super(new FilterFunction<Object>() {
+ @Override
+ public boolean filter(Object value) {
+ return false;
+ }
+ });
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
+
+ IN_CHECKPOINT_LATCH.trigger();
+
+ // this should lock
+ outStream.write(1);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // stream task that simply triggers a checkpoint
+ // ------------------------------------------------------------------------
+
+ public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
+
+ @Override
+ public void init() {}
+
+ @Override
+ protected void run() throws Exception {
+ triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()));
+ }
+
+ @Override
+ protected void cleanup() {}
+
+ @Override
+ protected void cancelTask() {}
+ }
+}
\ No newline at end of file