You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/06 15:04:42 UTC
[3/4] flink git commit: [FLINK-4730] Introducing CheckpointMetaData
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ff14249..8ada6d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -46,9 +47,9 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -558,9 +559,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@Override
- public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
try {
- return performCheckpoint(checkpointId, timestamp, 0L, 0L);
+ checkpointMetaData.
+ setBytesBufferedInAlignment(0L).
+ setAlignmentDurationNanos(0L);
+ return performCheckpoint(checkpointMetaData);
}
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
@@ -573,11 +577,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@Override
- public void triggerCheckpointOnBarrier(
- long checkpointId, long timestamp, long bytesAligned, long alignmentDurationNanos) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
try {
- performCheckpoint(checkpointId, timestamp, bytesAligned, alignmentDurationNanos);
+ performCheckpoint(checkpointMetaData);
}
catch (CancelTaskException e) {
throw e;
@@ -587,8 +590,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- private boolean performCheckpoint(
- long checkpointId, long timestamp, long bytesBufferedAlignment, long alignmentDurationNanos) throws Exception {
+ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+
+ long checkpointId = checkpointMetaData.getCheckpointId();
+ long timestamp = checkpointMetaData.getTimestamp();
LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
@@ -674,8 +679,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
- final long endOfSyncPart = System.nanoTime();
- final long syncDurationMillis = (endOfSyncPart - startOfSyncPart) / 1_000_000;
+ final long syncEndNanos = System.nanoTime();
+ final long syncDurationMillis = (syncEndNanos - startOfSyncPart) / 1_000_000;
+
+ checkpointMetaData.setSyncDurationMillis(syncDurationMillis);
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
"checkpoint-" + checkpointId + "-" + timestamp,
@@ -684,11 +691,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
chainedNonPartitionedStateHandles,
chainedPartitionedStateHandles,
keyGroupsStateHandleFuture,
- checkpointId,
- bytesBufferedAlignment,
- alignmentDurationNanos,
- syncDurationMillis,
- endOfSyncPart);
+ checkpointMetaData,
+ syncEndNanos);
cancelables.registerClosable(asyncCheckpointRunnable);
asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
@@ -696,7 +700,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}." +
"Alignment duration: {} ms, snapshot duration {} ms",
- getName(), checkpointId, alignmentDurationNanos / 1_000_000, syncDurationMillis);
+ getName(), checkpointId, checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, syncDurationMillis);
}
return true;
@@ -914,15 +918,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture;
- private final long checkpointId;
-
private final String name;
- private final long bytesBufferedInAlignment;
-
- private final long alignmentDurationNanos;
-
- private final long syncDurationMillies;
+ private final CheckpointMetaData checkpointMetaData;
private final long asyncStartNanos;
@@ -933,11 +931,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles,
ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles,
RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture,
- long checkpointId,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos,
- long syncDurationMillies,
- long asyncStartNanos) {
+ CheckpointMetaData checkpointMetaData,
+ long asyncStartNanos
+ ) {
this.name = name;
this.owner = owner;
@@ -945,10 +941,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
this.nonPartitionedStateHandles = nonPartitionedStateHandles;
this.partitioneableStateHandles = partitioneableStateHandles;
this.keyGroupsStateHandleFuture = keyGroupsStateHandleFuture;
- this.checkpointId = checkpointId;
- this.bytesBufferedInAlignment = bytesBufferedInAlignment;
- this.alignmentDurationNanos = alignmentDurationNanos;
- this.syncDurationMillies = syncDurationMillies;
+ this.checkpointMetaData = checkpointMetaData;
this.asyncStartNanos = asyncStartNanos;
}
@@ -974,26 +967,22 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
+ checkpointMetaData.setAsyncDurationMillis(asyncDurationMillis);
+
if (nonPartitionedStateHandles.isEmpty() && partitioneableStateHandles.isEmpty() && keyedStates.isEmpty()) {
- owner.getEnvironment().acknowledgeCheckpoint(
- checkpointId,
- syncDurationMillies, asyncDurationMillis,
- bytesBufferedInAlignment, alignmentDurationNanos);
+ owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData);
} else {
CheckpointStateHandles allStateHandles = new CheckpointStateHandles(
nonPartitionedStateHandles,
partitioneableStateHandles,
keyedStates);
- owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
- allStateHandles,
- syncDurationMillies, asyncDurationMillis,
- bytesBufferedInAlignment, alignmentDurationNanos);
+ owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, allStateHandles);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
- owner.getName(), checkpointId, asyncDurationMillis);
+ owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
}
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 5d68841..2cabd70 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -981,19 +982,17 @@ public class BarrierBufferTest {
}
@Override
- public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
throw new UnsupportedOperationException("should never be called");
}
@Override
- public void triggerCheckpointOnBarrier(
- long checkpointId, long timestamp,
- long bytesAligned, long alignmentTimeNanos) throws Exception {
-
- assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
- assertTrue(timestamp > 0);
- assertTrue(bytesAligned >= 0);
- assertTrue(alignmentTimeNanos >= 0);
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+
+ assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
+ assertTrue(checkpointMetaData.getTimestamp() > 0);
+ assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0);
+ assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0);
nextExpectedCheckpointId++;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index f2f9092..b6d0450 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
@@ -374,18 +375,16 @@ public class BarrierTrackerTest {
}
@Override
- public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
throw new UnsupportedOperationException("should never be called");
}
@Override
- public void triggerCheckpointOnBarrier(
- long checkpointId, long timestamp,
- long bytesAligned, long alignmentTimeNanos) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
assertTrue("More checkpoints than expected", i < checkpointIDs.length);
- assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointId);
- assertTrue(timestamp > 0);
+ assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointMetaData.getCheckpointId());
+ assertTrue(checkpointMetaData.getTimestamp() > 0);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index f6e7dca..1b2b723 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -374,7 +375,9 @@ public class OneInputStreamTaskTest extends TestLogger {
testHarness.invoke(env);
testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());
- while(!streamTask.triggerCheckpoint(checkpointId, checkpointTimestamp));
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
+
+ while(!streamTask.triggerCheckpoint(checkpointMetaData));
// since no state was set, there shouldn't be restore calls
assertEquals(0, TestingStreamOperator.numberRestoreCalls);
@@ -517,11 +520,10 @@ public class OneInputStreamTaskTest extends TestLogger {
@Override
public void acknowledgeCheckpoint(
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long syncDuration, long asymcDuration, long alignmentByte, long alignmentDuration) {
+ CheckpointMetaData checkpointMetaData,
+ CheckpointStateHandles checkpointStateHandles) {
- this.checkpointId = checkpointId;
+ this.checkpointId = checkpointMetaData.getCheckpointId();
if(checkpointStateHandles != null) {
this.state = checkpointStateHandles.getNonPartitionedStateHandles();
this.keyGroupStates = checkpointStateHandles.getKeyGroupsStateHandle();
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 1e62e28..10fc400 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -29,11 +30,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.TestHarnessUtil;
-
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -235,7 +234,8 @@ public class SourceStreamTaskTest {
public Boolean call() throws Exception {
for (int i = 0; i < numCheckpoints; i++) {
long currentCheckpointId = checkpointId.getAndIncrement();
- sourceTask.triggerCheckpoint(currentCheckpointId, 0L);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
+ sourceTask.triggerCheckpoint(checkpointMetaData);
Thread.sleep(checkpointInterval);
}
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9b773d8..f852682 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -43,18 +43,15 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -311,18 +308,12 @@ public class StreamMockEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(
- long checkpointId,
- long synchronousDurationMillis, long asynchronousDurationMillis,
- long bytesBufferedInAlignment, long alignmentDurationNanos) {
+ public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
}
@Override
public void acknowledgeCheckpoint(
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis, long asynchronousDurationMillis,
- long bytesBufferedInAlignment, long alignmentDurationNanos) {
+ CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
}
@Override