You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/06 15:04:40 UTC
[1/4] flink git commit: [hotfix] Minor cleanup in
FlinkKafkaConsumerBase and Test
Repository: flink
Updated Branches:
refs/heads/master 98710ead5 -> 0dac7ad00
[hotfix] Minor cleanup in FlinkKafkaConsumerBase and Test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/241d47b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/241d47b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/241d47b8
Branch: refs/heads/master
Commit: 241d47b867cc6bb21d11c6416697bd078885cf6d
Parents: fee1430
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Oct 2 11:30:59 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 6 17:04:09 2016 +0200
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumerBase.java | 3 ---
.../connectors/kafka/FlinkKafkaConsumerBaseTest.java | 10 +++++-----
2 files changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/241d47b8/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 8d63345..4b6cac7 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -47,7 +47,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -123,8 +122,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
*/
public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) {
this.topics = checkNotNull(topics);
- checkArgument(topics.size() > 0, "You have to define at least one topic.");
-
this.deserializer = checkNotNull(deserializer, "valueDeserializer");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/241d47b8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 373d6ab..76a69bc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -36,7 +36,6 @@ import org.mockito.Matchers;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -173,9 +172,9 @@ public class FlinkKafkaConsumerBaseTest {
state2.put(new KafkaTopicPartition("def", 7), 987654329L);
final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
- state2.put(new KafkaTopicPartition("abc", 13), 16780L);
- state2.put(new KafkaTopicPartition("def", 7), 987654377L);
-
+ state3.put(new KafkaTopicPartition("abc", 13), 16780L);
+ state3.put(new KafkaTopicPartition("def", 7), 987654377L);
+
final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
@@ -186,12 +185,13 @@ public class FlinkKafkaConsumerBaseTest {
OperatorStateStore backend = mock(OperatorStateStore.class);
+ TestingListState<Serializable> init = new TestingListState<>();
TestingListState<Serializable> listState1 = new TestingListState<>();
TestingListState<Serializable> listState2 = new TestingListState<>();
TestingListState<Serializable> listState3 = new TestingListState<>();
when(backend.getSerializableListState(Matchers.any(String.class))).
- thenReturn(listState1, listState1, listState2, listState3);
+ thenReturn(init, listState1, listState2, listState3);
consumer.initializeState(backend);
[4/4] flink git commit: [FLINK-4730] Introducing CheckpointMetaData
Posted by uc...@apache.org.
[FLINK-4730] Introducing CheckpointMetaData
This closes #2583.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dac7ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dac7ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dac7ad0
Branch: refs/heads/master
Commit: 0dac7ad00a25c4b038b57009f41f5512f8ce73c3
Parents: 241d47b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Oct 2 14:19:37 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 6 17:04:12 2016 +0200
----------------------------------------------------------------------
.../state/RocksDBAsyncSnapshotTest.java | 15 +--
.../runtime/checkpoint/CheckpointMetaData.java | 113 ++++++++++++++++
.../runtime/checkpoint/CheckpointMetrics.java | 131 +++++++++++++++++++
.../flink/runtime/execution/Environment.java | 41 ++----
.../runtime/jobgraph/tasks/StatefulTask.java | 16 +--
.../checkpoint/AcknowledgeCheckpoint.java | 56 +++-----
.../ActorGatewayCheckpointResponder.java | 22 ++--
.../taskmanager/CheckpointResponder.java | 30 ++---
.../runtime/taskmanager/RuntimeEnvironment.java | 28 ++--
.../apache/flink/runtime/taskmanager/Task.java | 10 +-
.../checkpoint/CheckpointCoordinatorTest.java | 118 ++++++++++-------
.../checkpoint/CheckpointStateRestoreTest.java | 24 ++--
.../jobmanager/JobManagerHARecoveryTest.java | 13 +-
.../messages/CheckpointMessagesTest.java | 5 +-
.../operators/testutils/DummyEnvironment.java | 17 +--
.../operators/testutils/MockEnvironment.java | 15 +--
.../runtime/taskmanager/TaskAsyncCallTest.java | 16 +--
.../kafka/FlinkKafkaConsumerBase.java | 2 +
.../kafka/FlinkKafkaConsumerBaseTest.java | 13 +-
.../streaming/runtime/io/BarrierBuffer.java | 20 +--
.../streaming/runtime/io/BarrierTracker.java | 22 +++-
.../streaming/runtime/tasks/StreamTask.java | 71 +++++-----
.../streaming/runtime/io/BarrierBufferTest.java | 17 ++-
.../runtime/io/BarrierTrackerTest.java | 11 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 12 +-
.../runtime/tasks/SourceStreamTaskTest.java | 6 +-
.../runtime/tasks/StreamMockEnvironment.java | 19 +--
27 files changed, 515 insertions(+), 348 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 2ebd84a..8f58075 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStateHandles;
@@ -134,14 +135,10 @@ public class RocksDBAsyncSnapshotTest {
@Override
public void acknowledgeCheckpoint(
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis, long asynchronousDurationMillis,
- long bytesBufferedInAlignment, long alignmentDurationNanos) {
+ CheckpointMetaData checkpointMetaData,
+ CheckpointStateHandles checkpointStateHandles) {
- super.acknowledgeCheckpoint(checkpointId, checkpointStateHandles,
- synchronousDurationMillis, asynchronousDurationMillis,
- bytesBufferedInAlignment, alignmentDurationNanos);
+ super.acknowledgeCheckpoint(checkpointMetaData);
// block on the latch, to verify that triggerCheckpoint returns below,
// even though the async checkpoint would not finish
@@ -171,7 +168,7 @@ public class RocksDBAsyncSnapshotTest {
}
}
- task.triggerCheckpoint(42, 17);
+ task.triggerCheckpoint(new CheckpointMetaData(42, 17));
testHarness.processElement(new StreamRecord<>("Wohoo", 0));
@@ -250,7 +247,7 @@ public class RocksDBAsyncSnapshotTest {
}
}
- task.triggerCheckpoint(42, 17);
+ task.triggerCheckpoint(new CheckpointMetaData(42, 17));
testHarness.processElement(new StreamRecord<>("Wohoo", 0));
BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
task.cancel();
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
new file mode 100644
index 0000000..6f117f2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Encapsulates all the meta data for a checkpoint.
+ */
+public class CheckpointMetaData implements Serializable {
+
+ private static final long serialVersionUID = -2387652345781312442L;
+
+ /** The ID of the checkpoint */
+ private final long checkpointId;
+
+ /** The timestamp of the checkpoint */
+ private final long timestamp;
+
+ private final CheckpointMetrics metrics;
+
+ public CheckpointMetaData(long checkpointId, long timestamp) {
+ this.checkpointId = checkpointId;
+ this.timestamp = timestamp;
+ this.metrics = new CheckpointMetrics();
+ }
+
+ public CheckpointMetaData(
+ long checkpointId,
+ long timestamp,
+ long synchronousDurationMillis,
+ long asynchronousDurationMillis,
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos) {
+ this.checkpointId = checkpointId;
+ this.timestamp = timestamp;
+ this.metrics = new CheckpointMetrics(
+ bytesBufferedInAlignment,
+ alignmentDurationNanos,
+ synchronousDurationMillis,
+ asynchronousDurationMillis);
+ }
+
+ public CheckpointMetrics getMetrics() {
+ return metrics;
+ }
+
+ public CheckpointMetaData setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
+ Preconditions.checkArgument(bytesBufferedInAlignment >= 0);
+ this.metrics.setBytesBufferedInAlignment(bytesBufferedInAlignment);
+ return this;
+ }
+
+ public CheckpointMetaData setAlignmentDurationNanos(long alignmentDurationNanos) {
+ Preconditions.checkArgument(alignmentDurationNanos >= 0);
+ this.metrics.setAlignmentDurationNanos(alignmentDurationNanos);
+ return this;
+ }
+
+ public CheckpointMetaData setSyncDurationMillis(long syncDurationMillis) {
+ Preconditions.checkArgument(syncDurationMillis >= 0);
+ this.metrics.setSyncDurationMillis(syncDurationMillis);
+ return this;
+ }
+
+ public CheckpointMetaData setAsyncDurationMillis(long asyncDurationMillis) {
+ Preconditions.checkArgument(asyncDurationMillis >= 0);
+ this.metrics.setAsyncDurationMillis(asyncDurationMillis);
+ return this;
+ }
+
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public long getBytesBufferedInAlignment() {
+ return metrics.getBytesBufferedInAlignment();
+ }
+
+ public long getAlignmentDurationNanos() {
+ return metrics.getAlignmentDurationNanos();
+ }
+
+ public long getSyncDurationMillis() {
+ return metrics.getSyncDurationMillis();
+ }
+
+ public long getAsyncDurationMillis() {
+ return metrics.getAsyncDurationMillis();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
new file mode 100644
index 0000000..4155290
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+public class CheckpointMetrics implements Serializable {
+
+ /**
+ * The number of bytes that were buffered during the checkpoint alignment phase
+ */
+ private long bytesBufferedInAlignment;
+
+ /**
+ * The duration (in nanoseconds) that the stream alignment for the checkpoint took
+ */
+ private long alignmentDurationNanos;
+
+ /* The duration (in milliseconds) of the synchronous part of the operator checkpoint */
+ private long syncDurationMillis;
+
+ /* The duration (in milliseconds) of the asynchronous part of the operator checkpoint */
+ private long asyncDurationMillis;
+
+ public CheckpointMetrics() {
+ this(-1L, -1L, -1L, -1L);
+ }
+
+ public CheckpointMetrics(
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos,
+ long syncDurationMillis,
+ long asyncDurationMillis) {
+
+ this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+ this.alignmentDurationNanos = alignmentDurationNanos;
+ this.syncDurationMillis = syncDurationMillis;
+ this.asyncDurationMillis = asyncDurationMillis;
+ }
+
+ public long getBytesBufferedInAlignment() {
+ return bytesBufferedInAlignment;
+ }
+
+ public void setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
+ this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+ }
+
+ public long getAlignmentDurationNanos() {
+ return alignmentDurationNanos;
+ }
+
+ public void setAlignmentDurationNanos(long alignmentDurationNanos) {
+ this.alignmentDurationNanos = alignmentDurationNanos;
+ }
+
+ public long getSyncDurationMillis() {
+ return syncDurationMillis;
+ }
+
+ public void setSyncDurationMillis(long syncDurationMillis) {
+ this.syncDurationMillis = syncDurationMillis;
+ }
+
+ public long getAsyncDurationMillis() {
+ return asyncDurationMillis;
+ }
+
+ public void setAsyncDurationMillis(long asyncDurationMillis) {
+ this.asyncDurationMillis = asyncDurationMillis;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CheckpointMetrics that = (CheckpointMetrics) o;
+
+ if (bytesBufferedInAlignment != that.bytesBufferedInAlignment) {
+ return false;
+ }
+ if (alignmentDurationNanos != that.alignmentDurationNanos) {
+ return false;
+ }
+ if (syncDurationMillis != that.syncDurationMillis) {
+ return false;
+ }
+ return asyncDurationMillis == that.asyncDurationMillis;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (bytesBufferedInAlignment ^ (bytesBufferedInAlignment >>> 32));
+ result = 31 * result + (int) (alignmentDurationNanos ^ (alignmentDurationNanos >>> 32));
+ result = 31 * result + (int) (syncDurationMillis ^ (syncDurationMillis >>> 32));
+ result = 31 * result + (int) (asyncDurationMillis ^ (asyncDurationMillis >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "CheckpointMetrics{" +
+ "bytesBufferedInAlignment=" + bytesBufferedInAlignment +
+ ", alignmentDurationNanos=" + alignmentDurationNanos +
+ ", syncDurationMillis=" + syncDurationMillis +
+ ", asyncDurationMillis=" + asyncDurationMillis +
+ '}';
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index f6cde95..cbbeec7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -19,20 +19,21 @@
package org.apache.flink.runtime.execution;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStateHandles;
import org.apache.flink.runtime.state.KvState;
@@ -161,47 +162,21 @@ public interface Environment {
* to for the checkpoint with the give checkpoint-ID. This method does not include
* any state in the checkpoint.
*
- * @param checkpointId
- * The ID of the checkpoint.
- * @param synchronousDurationMillis
- * The duration (in milliseconds) of the synchronous part of the operator checkpoint
- * @param asynchronousDurationMillis
- * The duration (in milliseconds) of the asynchronous part of the operator checkpoint
- * @param bytesBufferedInAlignment
- * The number of bytes that were buffered during the checkpoint alignment phase
- * @param alignmentDurationNanos
- * The duration (in nanoseconds) that the stream alignment for the checkpoint took
+ * @param checkpointMetaData the meta data for this checkpoint
*/
- void acknowledgeCheckpoint(
- long checkpointId,
- long synchronousDurationMillis,
- long asynchronousDurationMillis,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos);
+ void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData);
/**
* Confirms that the invokable has successfully completed all required steps for
* the checkpoint with the give checkpoint-ID. This method does include
* the given state in the checkpoint.
*
- * @param checkpointId The ID of the checkpoint.
* @param checkpointStateHandles All state handles for the checkpointed state
- * @param synchronousDurationMillis
- * The duration (in milliseconds) of the synchronous part of the operator checkpoint
- * @param asynchronousDurationMillis
- * The duration (in milliseconds) of the asynchronous part of the operator checkpoint
- * @param bytesBufferedInAlignment
- * The number of bytes that were buffered during the checkpoint alignment phase
- * @param alignmentDurationNanos
- * The duration (in nanoseconds) that the stream alignment for the checkpoint took
+ * @param checkpointMetaData the meta data for this checkpoint
*/
void acknowledgeCheckpoint(
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis,
- long asynchronousDurationMillis,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos);
+ CheckpointMetaData checkpointMetaData,
+ CheckpointStateHandles checkpointStateHandles);
/**
* Marks task execution failed for an external reason (a reason other than the task code itself
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 55e3e09..e1d15e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobgraph.tasks;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -52,29 +53,24 @@ public interface StatefulTask {
*
* <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
* i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
- * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(long, long, long, long)}
+ * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData)}
* method.
*
- * @param checkpointId The ID of the checkpoint, strictly incrementing.
- * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+ * @param checkpointMetaData Meta data for about this checkpoint
*
* @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
*/
- boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
+ boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception;
/**
* This method is called when a checkpoint is triggered as a result of receiving checkpoint
* barriers on all input streams.
*
- * @param checkpointId The ID of the checkpoint, strictly incrementing.
- * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
- * @param bytesAligned The number of bytes that were buffered during the alignment of the streams.
- * @param alignmentTimeNanos The time that the stream alignment took, in nanoseconds.
+ * @param checkpointMetaData Meta data for about this checkpoint
*
* @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
*/
- void triggerCheckpointOnBarrier(
- long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception;
+ void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception;
/**
* Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index e95e7b3..ac14d3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.messages.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.state.CheckpointStateHandles;
@@ -39,59 +40,32 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
private final CheckpointStateHandles checkpointStateHandles;
- /** The duration (in milliseconds) that the synchronous part of the checkpoint took */
- private final long synchronousDurationMillis;
-
- /** The duration (in milliseconds) that the asynchronous part of the checkpoint took */
- private final long asynchronousDurationMillis;
-
- /** The number of bytes that were buffered during the checkpoint alignment phase */
- private final long bytesBufferedInAlignment;
-
- /** The duration (in nanoseconds) that the alignment phase of the task's checkpoint took */
- private final long alignmentDurationNanos;
+ private final CheckpointMetaData checkpointMetaData;
// ------------------------------------------------------------------------
public AcknowledgeCheckpoint(
JobID job,
ExecutionAttemptID taskExecutionId,
- long checkpointId) {
- this(job, taskExecutionId, checkpointId, null);
+ CheckpointMetaData checkpointMetaData) {
+ this(job, taskExecutionId, checkpointMetaData, null);
}
public AcknowledgeCheckpoint(
JobID job,
ExecutionAttemptID taskExecutionId,
- long checkpointId,
+ CheckpointMetaData checkpointMetaData,
CheckpointStateHandles checkpointStateHandles) {
- this(job, taskExecutionId, checkpointId, checkpointStateHandles, -1L, -1L, -1L, -1L);
- }
- public AcknowledgeCheckpoint(
- JobID job,
- ExecutionAttemptID taskExecutionId,
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis,
- long asynchronousDurationMillis,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos) {
-
- super(job, taskExecutionId, checkpointId);
+ super(job, taskExecutionId, checkpointMetaData.getCheckpointId());
this.checkpointStateHandles = checkpointStateHandles;
-
+ this.checkpointMetaData = checkpointMetaData;
// these may be "-1", in case the values are unknown or not set
- checkArgument(synchronousDurationMillis >= -1);
- checkArgument(asynchronousDurationMillis >= -1);
- checkArgument(bytesBufferedInAlignment >= -1);
- checkArgument(alignmentDurationNanos >= -1);
-
- this.synchronousDurationMillis = synchronousDurationMillis;
- this.asynchronousDurationMillis = asynchronousDurationMillis;
- this.bytesBufferedInAlignment = bytesBufferedInAlignment;
- this.alignmentDurationNanos = alignmentDurationNanos;
+ checkArgument(checkpointMetaData.getSyncDurationMillis() >= -1);
+ checkArgument(checkpointMetaData.getAsyncDurationMillis() >= -1);
+ checkArgument(checkpointMetaData.getBytesBufferedInAlignment() >= -1);
+ checkArgument(checkpointMetaData.getAlignmentDurationNanos() >= -1);
}
// ------------------------------------------------------------------------
@@ -103,19 +77,19 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
}
public long getSynchronousDurationMillis() {
- return synchronousDurationMillis;
+ return checkpointMetaData.getSyncDurationMillis();
}
public long getAsynchronousDurationMillis() {
- return asynchronousDurationMillis;
+ return checkpointMetaData.getAsyncDurationMillis();
}
public long getBytesBufferedInAlignment() {
- return bytesBufferedInAlignment;
+ return checkpointMetaData.getBytesBufferedInAlignment();
}
public long getAlignmentDurationNanos() {
- return alignmentDurationNanos;
+ return checkpointMetaData.getAlignmentDurationNanos();
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index 8bf1127..6f1bf7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -41,18 +42,12 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
public void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
- long checkpointID,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis,
- long asynchronousDurationMillis,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos) {
+ CheckpointMetaData checkpointMetaData,
+ CheckpointStateHandles checkpointStateHandles) {
AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
- jobID, executionAttemptID, checkpointID,
- checkpointStateHandles,
- synchronousDurationMillis, asynchronousDurationMillis,
- bytesBufferedInAlignment, alignmentDurationNanos);
+ jobID, executionAttemptID, checkpointMetaData,
+ checkpointStateHandles);
actorGateway.tell(message);
}
@@ -61,14 +56,13 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
public void declineCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
- long checkpointID,
- long checkpointTimestamp) {
+ CheckpointMetaData checkpointMetaData) {
DeclineCheckpoint decline = new DeclineCheckpoint(
jobID,
executionAttemptID,
- checkpointID,
- checkpointTimestamp);
+ checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getTimestamp());
actorGateway.tell(decline);
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index 698a7f4..4fa20e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.state.CheckpointStateHandles;
@@ -34,40 +35,27 @@ public interface CheckpointResponder {
* Job ID of the running job
* @param executionAttemptID
* Execution attempt ID of the running task
- * @param checkpointID
- * Checkpoint ID of the checkpoint
- * @param checkpointStateHandles
+ * @param checkpointStateHandles
* State handles for the checkpoint
- * @param synchronousDurationMillis
- * The duration (in milliseconds) of the synchronous part of the operator checkpoint
- * @param asynchronousDurationMillis
- * The duration (in milliseconds) of the asynchronous part of the operator checkpoint
- * @param bytesBufferedInAlignment
- * The number of bytes that were buffered during the checkpoint alignment phase
- * @param alignmentDurationNanos
- * The duration (in nanoseconds) that the stream alignment for the checkpoint took
+ * @param checkpointMetaData
+ * Meta data for this checkpoint
+ *
*/
void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
- long checkpointID,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis,
- long asynchronousDurationMillis,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos);
+ CheckpointMetaData checkpointMetaData,
+ CheckpointStateHandles checkpointStateHandles);
/**
* Declines the given checkpoint.
*
* @param jobID Job ID of the running job
* @param executionAttemptID Execution attempt ID of the running task
- * @param checkpointID Checkpoint ID of the checkpoint
- * @param checkpointTimestamp Timestamp of the checkpoint
+ * @param checkpointMetaData Meta data for this checkpoint
*/
void declineCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
- long checkpointID,
- long checkpointTimestamp);
+ CheckpointMetaData checkpointMetaData);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index c2ba7ef..f6720e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -236,33 +237,20 @@ public class RuntimeEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(
- long checkpointId,
- long synchronousDurationMillis,
- long asynchronousDurationMillis,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos) {
-
- acknowledgeCheckpoint(checkpointId, null,
- synchronousDurationMillis, asynchronousDurationMillis,
- bytesBufferedInAlignment, alignmentDurationNanos);
+ public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
+
+ acknowledgeCheckpoint(checkpointMetaData, null);
}
@Override
public void acknowledgeCheckpoint(
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis,
- long asynchronousDurationMillis,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos) {
+ CheckpointMetaData checkpointMetaData,
+ CheckpointStateHandles checkpointStateHandles) {
checkpointResponder.acknowledgeCheckpoint(
- jobId, executionId, checkpointId,
- checkpointStateHandles,
- synchronousDurationMillis, asynchronousDurationMillis,
- bytesBufferedInAlignment, alignmentDurationNanos);
+ jobId, executionId, checkpointMetaData,
+ checkpointStateHandles);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 8463fa0..02a41b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.io.network.PartitionState;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
@@ -990,9 +991,12 @@ public class Task implements Runnable, TaskActions {
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
*/
- public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
+ public void triggerCheckpointBarrier(long checkpointID, long checkpointTimestamp) {
+
AbstractInvokable invokable = this.invokable;
+ final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
+
if (executionState == ExecutionState.RUNNING && invokable != null) {
if (invokable instanceof StatefulTask) {
@@ -1004,9 +1008,9 @@ public class Task implements Runnable, TaskActions {
@Override
public void run() {
try {
- boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
+ boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
if (!success) {
- checkpointResponder.declineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
+ checkpointResponder.declineCheckpoint(jobId, getExecutionId(), checkpointMetaData);
}
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 972f0ea..6289fcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -305,15 +305,17 @@ public class CheckpointCoordinatorTest {
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
}
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
// acknowledge from one of the tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
// acknowledge the same task again (should not matter)
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
@@ -549,20 +551,22 @@ public class CheckpointCoordinatorTest {
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
}
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
// acknowledge from one of the tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
// acknowledge the same task again (should not matter)
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
// acknowledge the other task.
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
// the checkpoint is internally converted to a successful checkpoint and the
// pending checkpoint object is disposed
@@ -593,8 +597,9 @@ public class CheckpointCoordinatorTest {
coord.triggerCheckpoint(timestampNew);
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+ CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew));
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -686,8 +691,10 @@ public class CheckpointCoordinatorTest {
verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
+ CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
+
// acknowledge one of the three tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
// start the second checkpoint
// trigger the first checkpoint. this should succeed
@@ -705,6 +712,8 @@ public class CheckpointCoordinatorTest {
}
long checkpointId2 = pending2.getCheckpointId();
+ CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
+
// trigger messages should have been sent
verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
@@ -713,10 +722,10 @@ public class CheckpointCoordinatorTest {
// we acknowledge the remaining two tasks from the first
// checkpoint and two tasks from the second checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
// now, the first checkpoint should be confirmed
assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -728,7 +737,7 @@ public class CheckpointCoordinatorTest {
new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
// send the last remaining ack for the second checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
// now, the second checkpoint should be confirmed
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -821,8 +830,10 @@ public class CheckpointCoordinatorTest {
verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
+ CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
+
// acknowledge one of the three tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
// start the second checkpoint
// trigger the first checkpoint. this should succeed
@@ -848,10 +859,12 @@ public class CheckpointCoordinatorTest {
// we acknowledge one more task from the first checkpoint and the second
// checkpoint completely. The second checkpoint should then subsume the first checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+ CheckpointMetaData checkpointMetaData2= new CheckpointMetaData(checkpointId2, 0L);
+
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
// now, the second checkpoint should be confirmed, and the first discarded
// actually both pending checkpoints are discarded, and the second has been transformed
@@ -875,7 +888,7 @@ public class CheckpointCoordinatorTest {
new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
// send the last remaining ack for the first checkpoint. This should not do anything
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, new CheckpointMetaData(checkpointId1, 0L)));
coord.shutdown();
}
@@ -931,7 +944,7 @@ public class CheckpointCoordinatorTest {
PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
assertFalse(checkpoint.isDiscarded());
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(checkpoint.getCheckpointId(), 0L)));
// wait until the checkpoint must have expired.
// we check every 250 msecs conservatively for 5 seconds
@@ -1000,14 +1013,16 @@ public class CheckpointCoordinatorTest {
// of the vertices that need to be acknowledged.
// non of the messages should throw an exception
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
// wrong job id
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointMetaData));
// unknown checkpoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(1L, 0L)));
// unknown ack vertex
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointMetaData));
coord.shutdown();
}
@@ -1271,9 +1286,10 @@ public class CheckpointCoordinatorTest {
assertFalse(pending.canBeSubsumed());
assertTrue(pending instanceof PendingSavepoint);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
// acknowledge from one of the tasks
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
assertEquals(1, pending.getNumberOfAcknowledgedTasks());
assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
assertFalse(pending.isDiscarded());
@@ -1281,13 +1297,13 @@ public class CheckpointCoordinatorTest {
assertFalse(savepointFuture.isCompleted());
// acknowledge the same task again (should not matter)
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
assertFalse(pending.isDiscarded());
assertFalse(pending.isFullyAcknowledged());
assertFalse(savepointFuture.isCompleted());
// acknowledge the other task.
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
// the checkpoint is internally converted to a successful checkpoint and the
// pending checkpoint object is disposed
@@ -1319,9 +1335,11 @@ public class CheckpointCoordinatorTest {
savepointFuture = coord.triggerSavepoint(timestampNew);
assertFalse(savepointFuture.isCompleted());
+
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+ CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew));
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1386,6 +1404,7 @@ public class CheckpointCoordinatorTest {
// Trigger savepoint and checkpoint
Future<String> savepointFuture1 = coord.triggerSavepoint(timestamp);
long savepointId1 = counter.getLast();
+ CheckpointMetaData checkpointMetaDataS1 = new CheckpointMetaData(savepointId1, 0L);
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertTrue(coord.triggerCheckpoint(timestamp + 1));
@@ -1395,9 +1414,11 @@ public class CheckpointCoordinatorTest {
long checkpointId2 = counter.getLast();
assertEquals(3, coord.getNumberOfPendingCheckpoints());
+ CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
+
// 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData2));
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1410,11 +1431,12 @@ public class CheckpointCoordinatorTest {
Future<String> savepointFuture2 = coord.triggerSavepoint(timestamp + 4);
long savepointId2 = counter.getLast();
+ CheckpointMetaData checkpointMetaDataS2 = new CheckpointMetaData(savepointId2, 0L);
assertEquals(3, coord.getNumberOfPendingCheckpoints());
// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS2));
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1424,8 +1446,8 @@ public class CheckpointCoordinatorTest {
assertTrue(savepointFuture2.isCompleted());
// Ack first savepoint
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS1));
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1488,7 +1510,7 @@ public class CheckpointCoordinatorTest {
.sendMessageToCurrentExecution(any(TriggerCheckpoint.class), eq(triggerAttemptID));
// now, once we acknowledge one checkpoint, it should trigger the next one
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(1L, 0L)));
// this should have immediately triggered a new checkpoint
now = System.currentTimeMillis();
@@ -1562,7 +1584,7 @@ public class CheckpointCoordinatorTest {
// now we acknowledge the second checkpoint, which should subsume the first checkpoint
// and allow two more checkpoints to be triggered
// now, once we acknowledge one checkpoint, it should trigger the next one
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(2L, 0L)));
// after a while, there should be the new checkpoints
final long newTimeout = System.currentTimeMillis() + 60000;
@@ -1690,7 +1712,7 @@ public class CheckpointCoordinatorTest {
// ACK all savepoints
long checkpointId = checkpointIDCounter.getLast();
for (int i = 0; i < numSavepoints; i++, checkpointId--) {
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, new CheckpointMetaData(checkpointId, 0L)));
}
// After ACKs, all should be completed
@@ -1789,6 +1811,7 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
@@ -1802,7 +1825,7 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
+ checkpointMetaData,
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1816,7 +1839,7 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
+ checkpointMetaData,
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1892,6 +1915,7 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
@@ -1903,7 +1927,7 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
+ checkpointMetaData,
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1917,7 +1941,7 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
+ checkpointMetaData,
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2005,6 +2029,7 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
List<KeyGroupRange> keyGroupPartitions1 =
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
@@ -2020,7 +2045,7 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
+ checkpointMetaData,
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2037,7 +2062,7 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
+ checkpointMetaData,
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2126,6 +2151,7 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
List<KeyGroupRange> keyGroupPartitions1 =
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
@@ -2142,7 +2168,7 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
+ checkpointMetaData,
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2158,7 +2184,7 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
+ checkpointMetaData,
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2227,7 +2253,7 @@ public class CheckpointCoordinatorTest {
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
+ new CheckpointMetaData(checkpointId, 0L),
checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index bb78b6a..c20c604 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -117,12 +117,12 @@ public class CheckpointStateRestoreTest {
final long checkpointId = pending.getCheckpointId();
CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(serializedState, null, serializedKeyGroupStates);
-
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -217,11 +217,13 @@ public class CheckpointStateRestoreTest {
// the difference to the test "testSetState" is that one stateful subtask does not report state
CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(serializedState, null, serializedKeyGroupStates);
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 38231ec..6100856 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -458,10 +459,10 @@ public class JobManagerHARecoveryTest {
}
@Override
- public boolean triggerCheckpoint(long checkpointId, long timestamp) {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
try {
ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
- InstantiationUtil.serializeObject(checkpointId));
+ InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
@@ -472,9 +473,8 @@ public class JobManagerHARecoveryTest {
new CheckpointStateHandles(chainedStateHandle, null, Collections.<KeyGroupsStateHandle>emptyList());
getEnvironment().acknowledgeCheckpoint(
- checkpointId,
- checkpointStateHandles,
- 0L, 0L, 0L, 0L);
+ new CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L),
+ checkpointStateHandles);
return true;
} catch (Exception ex) {
throw new RuntimeException(ex);
@@ -482,8 +482,7 @@ public class JobManagerHARecoveryTest {
}
@Override
- public void triggerCheckpointOnBarrier(
- long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
throw new UnsupportedOperationException("should not be called!");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 4873335..305625e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -62,7 +63,7 @@ public class CheckpointMessagesTest {
public void testConfirmTaskCheckpointed() {
try {
AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint(
- new JobID(), new ExecutionAttemptID(), 569345L);
+ new JobID(), new ExecutionAttemptID(), new CheckpointMetaData(569345L, 0L));
KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42);
@@ -75,7 +76,7 @@ public class CheckpointMessagesTest {
AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
new JobID(),
new ExecutionAttemptID(),
- 87658976143L,
+ new CheckpointMetaData(87658976143L, 0L),
checkpointStateHandles);
testSerializabilityEqualsHashCode(noState);
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index c855230..bb07122 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -23,9 +23,9 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -34,16 +34,13 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@@ -154,18 +151,12 @@ public class DummyEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(
- long checkpointId,
- long synchronousDurationMillis, long asynchronousDurationMillis,
- long bytesBufferedInAlignment, long alignmentDurationNanos) {
+ public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
}
@Override
public void acknowledgeCheckpoint(
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis, long asynchronousDurationMillis,
- long bytesBufferedInAlignment, long alignmentDurationNanos) {
+ CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index c3ed6c0..eb55c4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -45,11 +46,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
@@ -315,18 +312,12 @@ public class MockEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(
- long checkpointId,
- long synchronousDurationMillis, long asynchronousDurationMillis,
- long bytesBufferedInAlignment, long alignmentDurationNanos) {
+ public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
}
@Override
public void acknowledgeCheckpoint(
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis, long asynchronousDurationMillis,
- long bytesBufferedInAlignment, long alignmentDurationNanos) {
+ CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7bc2c29..e2abe88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,11 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -36,19 +34,20 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.memory.MemoryManager;
-
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
-
import org.apache.flink.util.SerializedValue;
import org.junit.Before;
import org.junit.Test;
@@ -217,9 +216,9 @@ public class TaskAsyncCallTest {
}
@Override
- public boolean triggerCheckpoint(long checkpointId, long timestamp) {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
lastCheckpointId++;
- if (checkpointId == lastCheckpointId) {
+ if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
if (lastCheckpointId == NUM_CALLS) {
triggerLatch.trigger();
}
@@ -234,8 +233,7 @@ public class TaskAsyncCallTest {
}
@Override
- public void triggerCheckpointOnBarrier(
- long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
throw new UnsupportedOperationException("Should not be called");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 4b6cac7..5b18c75 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -122,6 +123,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
*/
public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) {
this.topics = checkNotNull(topics);
+ checkArgument(topics.size() > 0, "You have to define at least one topic.");
this.deserializer = checkNotNull(deserializer, "valueDeserializer");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 76a69bc..6d2dc70 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -36,6 +36,7 @@ import org.mockito.Matchers;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -159,7 +160,7 @@ public class FlinkKafkaConsumerBaseTest {
assertFalse(listState.get().iterator().hasNext());
}
-
+
@Test
@SuppressWarnings("unchecked")
public void testSnapshotState() throws Exception {
@@ -177,9 +178,9 @@ public class FlinkKafkaConsumerBaseTest {
final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
-
+
final LinkedMap pendingCheckpoints = new LinkedMap();
-
+
FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingCheckpoints, true);
assertEquals(0, pendingCheckpoints.size());
@@ -222,7 +223,7 @@ public class FlinkKafkaConsumerBaseTest {
assertEquals(state2, snapshot2);
assertEquals(2, pendingCheckpoints.size());
assertEquals(state2, pendingCheckpoints.get(140L));
-
+
// ack checkpoint 1
consumer.notifyCheckpointComplete(138L);
assertEquals(1, pendingCheckpoints.size());
@@ -241,7 +242,7 @@ public class FlinkKafkaConsumerBaseTest {
assertEquals(state3, snapshot3);
assertEquals(2, pendingCheckpoints.size());
assertEquals(state3, pendingCheckpoints.get(141L));
-
+
// ack checkpoint 3, subsumes number 2
consumer.notifyCheckpointComplete(141L);
assertEquals(0, pendingCheckpoints.size());
@@ -302,7 +303,7 @@ public class FlinkKafkaConsumerBaseTest {
@SuppressWarnings("unchecked")
public DummyFlinkKafkaConsumer() {
- super(Arrays.asList("abc", "def"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+ super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index d60c999..7f01129 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,20 +17,20 @@
package org.apache.flink.streaming.runtime.io;
-import java.io.IOException;
-import java.util.ArrayDeque;
-
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayDeque;
+
/**
* The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
* all inputs have received the barrier for a given checkpoint.
@@ -219,9 +219,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
releaseBlocks();
if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
- receivedBarrier.getId(), receivedBarrier.getTimestamp(),
- bufferSpiller.getBytesWritten(), latestAlignmentDurationNanos);
+ CheckpointMetaData checkpointMetaData =
+ new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+ checkpointMetaData.
+ setBytesBufferedInAlignment(bufferSpiller.getBytesWritten()).
+ setAlignmentDurationNanos(latestAlignmentDurationNanos);
+
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 1db5845..86945a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,10 +19,11 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import java.util.ArrayDeque;
@@ -115,8 +116,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
// fast path for single channel trackers
if (totalNumberOfInputChannels == 1) {
if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
- receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
+ CheckpointMetaData checkpointMetaData =
+ new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+
+ checkpointMetaData.
+ setBytesBufferedInAlignment(0L).
+ setAlignmentDurationNanos(0L);
+
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
}
return;
}
@@ -149,8 +156,13 @@ public class BarrierTracker implements CheckpointBarrierHandler {
// notify the listener
if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
- receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
+ CheckpointMetaData checkpointMetaData =
+ new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+ checkpointMetaData.
+ setBytesBufferedInAlignment(0L).
+ setAlignmentDurationNanos(0L);
+
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
}
}
}
[2/4] flink git commit: [hotfix] Make KeyGroupsStateHandle implement
StreamStateHandle
Posted by uc...@apache.org.
[hotfix] Make KeyGroupsStateHandle implement StreamStateHandle
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fee14309
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fee14309
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fee14309
Branch: refs/heads/master
Commit: fee143099a5ac7220622617c79abae60786555e4
Parents: 98710ea
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Oct 2 14:15:48 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 6 17:04:09 2016 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 2 +-
.../savepoint/SavepointV1Serializer.java | 2 +-
.../runtime/state/KeyGroupsStateHandle.java | 20 +++++++++++---------
.../state/heap/HeapKeyedStateBackend.java | 2 +-
.../checkpoint/CheckpointCoordinatorTest.java | 7 +++----
5 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 126ebd2..7ab35c4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -624,7 +624,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private void restoreKeyGroupsInStateHandle()
throws IOException, RocksDBException, ClassNotFoundException {
try {
- currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream();
+ currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
restoreKVStateMetaData();
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 536062a..f120e1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -205,7 +205,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
for (int keyGroup : stateHandle.keyGroups()) {
dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup));
}
- serializeStreamStateHandle(stateHandle.getStateHandle(), dos);
+ serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
} else {
dos.writeByte(NULL_HANDLE);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index ea12808..1d277b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -28,7 +29,7 @@ import java.io.IOException;
* consists of a range of key group snapshots. A key group is subset of the available
* key space. The key groups are identified by their key group indices.
*/
-public class KeyGroupsStateHandle implements StateObject {
+public class KeyGroupsStateHandle implements StreamStateHandle {
private static final long serialVersionUID = -8070326169926626355L;
@@ -104,14 +105,6 @@ public class KeyGroupsStateHandle implements StateObject {
return groupRangeOffsets.getKeyGroupRange().getNumberOfKeyGroups();
}
- /**
- *
- * @return the inner stream state handle to the actual key-group states
- */
- public StreamStateHandle getStateHandle() {
- return stateHandle;
- }
-
@Override
public void discardState() throws Exception {
stateHandle.discardState();
@@ -123,6 +116,15 @@ public class KeyGroupsStateHandle implements StateObject {
}
@Override
+ public FSDataInputStream openInputStream() throws IOException {
+ return stateHandle.openInputStream();
+ }
+
+ public StreamStateHandle getDelegateStateHandle() {
+ return stateHandle;
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o) {
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a766373..040677b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -259,7 +259,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
- fsDataInputStream = keyGroupsHandle.getStateHandle().openInputStream();
+ fsDataInputStream = keyGroupsHandle.openInputStream();
cancelStreamRegistry.registerClosable(fsDataInputStream);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 5fb0e6f..972f0ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2474,7 +2474,7 @@ public class CheckpointCoordinatorTest {
assertEquals(expectedTotalKeyGroups, actualTotalKeyGroups);
- try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.getStateHandle().openInputStream()) {
+ try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream()) {
for (int groupId : expectedHeadOpKeyGroupStateHandle.keyGroups()) {
long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
inputStream.seek(offset);
@@ -2483,9 +2483,8 @@ public class CheckpointCoordinatorTest {
for (KeyGroupsStateHandle oneActualKeyGroupStateHandle : actualPartitionedKeyGroupState) {
if (oneActualKeyGroupStateHandle.containsKeyGroup(groupId)) {
long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
- try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.
- getStateHandle().openInputStream()) {
-
+ try (FSDataInputStream actualInputStream =
+ oneActualKeyGroupStateHandle.openInputStream()) {
actualInputStream.seek(actualOffset);
int actualGroupState = InstantiationUtil.
[3/4] flink git commit: [FLINK-4730] Introducing CheckpointMetaData
Posted by uc...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ff14249..8ada6d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -46,9 +47,9 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -558,9 +559,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@Override
- public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
try {
- return performCheckpoint(checkpointId, timestamp, 0L, 0L);
+ checkpointMetaData.
+ setBytesBufferedInAlignment(0L).
+ setAlignmentDurationNanos(0L);
+ return performCheckpoint(checkpointMetaData);
}
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
@@ -573,11 +577,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@Override
- public void triggerCheckpointOnBarrier(
- long checkpointId, long timestamp, long bytesAligned, long alignmentDurationNanos) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
try {
- performCheckpoint(checkpointId, timestamp, bytesAligned, alignmentDurationNanos);
+ performCheckpoint(checkpointMetaData);
}
catch (CancelTaskException e) {
throw e;
@@ -587,8 +590,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- private boolean performCheckpoint(
- long checkpointId, long timestamp, long bytesBufferedAlignment, long alignmentDurationNanos) throws Exception {
+ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+
+ long checkpointId = checkpointMetaData.getCheckpointId();
+ long timestamp = checkpointMetaData.getTimestamp();
LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
@@ -674,8 +679,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
- final long endOfSyncPart = System.nanoTime();
- final long syncDurationMillis = (endOfSyncPart - startOfSyncPart) / 1_000_000;
+ final long syncEndNanos = System.nanoTime();
+ final long syncDurationMillis = (syncEndNanos - startOfSyncPart) / 1_000_000;
+
+ checkpointMetaData.setSyncDurationMillis(syncDurationMillis);
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
"checkpoint-" + checkpointId + "-" + timestamp,
@@ -684,11 +691,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
chainedNonPartitionedStateHandles,
chainedPartitionedStateHandles,
keyGroupsStateHandleFuture,
- checkpointId,
- bytesBufferedAlignment,
- alignmentDurationNanos,
- syncDurationMillis,
- endOfSyncPart);
+ checkpointMetaData,
+ syncEndNanos);
cancelables.registerClosable(asyncCheckpointRunnable);
asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
@@ -696,7 +700,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}." +
"Alignment duration: {} ms, snapshot duration {} ms",
- getName(), checkpointId, alignmentDurationNanos / 1_000_000, syncDurationMillis);
+ getName(), checkpointId, checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, syncDurationMillis);
}
return true;
@@ -914,15 +918,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture;
- private final long checkpointId;
-
private final String name;
- private final long bytesBufferedInAlignment;
-
- private final long alignmentDurationNanos;
-
- private final long syncDurationMillies;
+ private final CheckpointMetaData checkpointMetaData;
private final long asyncStartNanos;
@@ -933,11 +931,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles,
ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles,
RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture,
- long checkpointId,
- long bytesBufferedInAlignment,
- long alignmentDurationNanos,
- long syncDurationMillies,
- long asyncStartNanos) {
+ CheckpointMetaData checkpointMetaData,
+ long asyncStartNanos
+ ) {
this.name = name;
this.owner = owner;
@@ -945,10 +941,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
this.nonPartitionedStateHandles = nonPartitionedStateHandles;
this.partitioneableStateHandles = partitioneableStateHandles;
this.keyGroupsStateHandleFuture = keyGroupsStateHandleFuture;
- this.checkpointId = checkpointId;
- this.bytesBufferedInAlignment = bytesBufferedInAlignment;
- this.alignmentDurationNanos = alignmentDurationNanos;
- this.syncDurationMillies = syncDurationMillies;
+ this.checkpointMetaData = checkpointMetaData;
this.asyncStartNanos = asyncStartNanos;
}
@@ -974,26 +967,22 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
+ checkpointMetaData.setAsyncDurationMillis(asyncDurationMillis);
+
if (nonPartitionedStateHandles.isEmpty() && partitioneableStateHandles.isEmpty() && keyedStates.isEmpty()) {
- owner.getEnvironment().acknowledgeCheckpoint(
- checkpointId,
- syncDurationMillies, asyncDurationMillis,
- bytesBufferedInAlignment, alignmentDurationNanos);
+ owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData);
} else {
CheckpointStateHandles allStateHandles = new CheckpointStateHandles(
nonPartitionedStateHandles,
partitioneableStateHandles,
keyedStates);
- owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
- allStateHandles,
- syncDurationMillies, asyncDurationMillis,
- bytesBufferedInAlignment, alignmentDurationNanos);
+ owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, allStateHandles);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
- owner.getName(), checkpointId, asyncDurationMillis);
+ owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
}
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 5d68841..2cabd70 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -981,19 +982,17 @@ public class BarrierBufferTest {
}
@Override
- public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
throw new UnsupportedOperationException("should never be called");
}
@Override
- public void triggerCheckpointOnBarrier(
- long checkpointId, long timestamp,
- long bytesAligned, long alignmentTimeNanos) throws Exception {
-
- assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
- assertTrue(timestamp > 0);
- assertTrue(bytesAligned >= 0);
- assertTrue(alignmentTimeNanos >= 0);
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
+
+ assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
+ assertTrue(checkpointMetaData.getTimestamp() > 0);
+ assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0);
+ assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0);
nextExpectedCheckpointId++;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index f2f9092..b6d0450 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
@@ -374,18 +375,16 @@ public class BarrierTrackerTest {
}
@Override
- public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
throw new UnsupportedOperationException("should never be called");
}
@Override
- public void triggerCheckpointOnBarrier(
- long checkpointId, long timestamp,
- long bytesAligned, long alignmentTimeNanos) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
assertTrue("More checkpoints than expected", i < checkpointIDs.length);
- assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointId);
- assertTrue(timestamp > 0);
+ assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointMetaData.getCheckpointId());
+ assertTrue(checkpointMetaData.getTimestamp() > 0);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index f6e7dca..1b2b723 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -374,7 +375,9 @@ public class OneInputStreamTaskTest extends TestLogger {
testHarness.invoke(env);
testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());
- while(!streamTask.triggerCheckpoint(checkpointId, checkpointTimestamp));
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
+
+ while(!streamTask.triggerCheckpoint(checkpointMetaData));
// since no state was set, there shouldn't be restore calls
assertEquals(0, TestingStreamOperator.numberRestoreCalls);
@@ -517,11 +520,10 @@ public class OneInputStreamTaskTest extends TestLogger {
@Override
public void acknowledgeCheckpoint(
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long syncDuration, long asymcDuration, long alignmentByte, long alignmentDuration) {
+ CheckpointMetaData checkpointMetaData,
+ CheckpointStateHandles checkpointStateHandles) {
- this.checkpointId = checkpointId;
+ this.checkpointId = checkpointMetaData.getCheckpointId();
if(checkpointStateHandles != null) {
this.state = checkpointStateHandles.getNonPartitionedStateHandles();
this.keyGroupStates = checkpointStateHandles.getKeyGroupsStateHandle();
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 1e62e28..10fc400 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -29,11 +30,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.TestHarnessUtil;
-
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -235,7 +234,8 @@ public class SourceStreamTaskTest {
public Boolean call() throws Exception {
for (int i = 0; i < numCheckpoints; i++) {
long currentCheckpointId = checkpointId.getAndIncrement();
- sourceTask.triggerCheckpoint(currentCheckpointId, 0L);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
+ sourceTask.triggerCheckpoint(checkpointMetaData);
Thread.sleep(checkpointInterval);
}
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/0dac7ad0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9b773d8..f852682 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -43,18 +43,15 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -311,18 +308,12 @@ public class StreamMockEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(
- long checkpointId,
- long synchronousDurationMillis, long asynchronousDurationMillis,
- long bytesBufferedInAlignment, long alignmentDurationNanos) {
+ public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) {
}
@Override
public void acknowledgeCheckpoint(
- long checkpointId,
- CheckpointStateHandles checkpointStateHandles,
- long synchronousDurationMillis, long asynchronousDurationMillis,
- long bytesBufferedInAlignment, long alignmentDurationNanos) {
+ CheckpointMetaData checkpointMetaData, CheckpointStateHandles checkpointStateHandles) {
}
@Override