You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:14 UTC
[flink] 05/16: [hotfix][network] Do not abort the same checkpoint
barrier twice when cancellation marker was lost
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 62927c90c5a8aa79dfc68b6bb4f05f4aeebcd2b4
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sat Jun 15 10:50:24 2019 +0200
[hotfix][network] Do not abort the same checkpoint barrier twice when cancellation marker was lost
---
.../flink/streaming/runtime/io/BarrierBuffer.java | 12 +--
.../runtime/io/BarrierBufferTestBase.java | 19 +++++
.../streaming/runtime/io/BarrierTrackerTest.java | 55 -------------
.../runtime/io/CheckpointSequenceValidator.java | 90 ++++++++++++++++++++++
4 files changed, 112 insertions(+), 64 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 23717f5..0f8fa40 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -330,12 +330,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
startOfAlignmentTimestamp = 0L;
latestAlignmentDurationNanos = 0L;
- notifyAbort(currentCheckpointId,
- new CheckpointException(
- "Barrier id: " + barrierId,
- CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED
- ));
-
notifyAbortOnCancellationBarrier(barrierId);
}
@@ -380,11 +374,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
if (toNotifyOnCheckpoint != null) {
CheckpointMetaData checkpointMetaData =
- new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+ new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
- .setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
- .setAlignmentDurationNanos(latestAlignmentDurationNanos);
+ .setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
+ .setAlignmentDurationNanos(latestAlignmentDurationNanos);
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
checkpointMetaData,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index 3b4f65f..e6e48a8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -689,6 +689,25 @@ public abstract class BarrierBufferTestBase {
}
@Test
+ public void testMissingCancellationBarriers() throws Exception {
+ BufferOrEvent[] sequence = {
+ createBarrier(1L, 0),
+ createCancellationBarrier(2L, 0),
+ createCancellationBarrier(3L, 0),
+ createCancellationBarrier(3L, 1),
+ createBuffer(0)
+ };
+ AbstractInvokable validator = new CheckpointSequenceValidator(-3);
+ buffer = createBarrierBuffer(2, sequence, validator);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) {
+ assertEquals(boe, buffer.pollNext().get());
+ }
+ }
+ }
+
+ @Test
public void testEarlyCleanup() throws Exception {
BufferOrEvent[] sequence = {
createBuffer(0), createBuffer(1), createBuffer(2),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index cb58837..1be2aab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,8 +19,6 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -28,7 +26,6 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.junit.After;
import org.junit.Test;
@@ -40,7 +37,6 @@ import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -404,55 +400,4 @@ public class BarrierTrackerTest {
// Testing Mocks
// ------------------------------------------------------------------------
- private static class CheckpointSequenceValidator extends AbstractInvokable {
-
- private final long[] checkpointIDs;
-
- private int i = 0;
-
- private CheckpointSequenceValidator(long... checkpointIDs) {
- super(new DummyEnvironment("test", 1, 0));
- this.checkpointIDs = checkpointIDs;
- }
-
- @Override
- public void invoke() {
- throw new UnsupportedOperationException("should never be called");
- }
-
- @Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
- throw new UnsupportedOperationException("should never be called");
- }
-
- @Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
- assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-
- final long expectedId = checkpointIDs[i++];
- if (expectedId >= 0) {
- assertEquals("wrong checkpoint id", expectedId, checkpointMetaData.getCheckpointId());
- assertTrue(checkpointMetaData.getTimestamp() > 0);
- } else {
- fail("got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'");
- }
- }
-
- @Override
- public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
- assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-
- final long expectedId = checkpointIDs[i++];
- if (expectedId < 0) {
- assertEquals("wrong checkpoint id for checkpoint abort", -expectedId, checkpointId);
- } else {
- fail("got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'");
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- throw new UnsupportedOperationException("should never be called");
- }
- }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
new file mode 100644
index 0000000..83b5364
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@link AbstractInvokable} that validates expected order of completed and aborted checkpoints.
+ */
+class CheckpointSequenceValidator extends AbstractInvokable {
+
+ private final long[] checkpointIDs;
+
+ private int i = 0;
+
+ CheckpointSequenceValidator(long... checkpointIDs) {
+ super(new DummyEnvironment("test", 1, 0));
+ this.checkpointIDs = checkpointIDs;
+ }
+
+ @Override
+ public void invoke() {
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
+ assertTrue("Unexpected triggerCheckpointOnBarrier(" + checkpointMetaData.getCheckpointId() + ")", i < checkpointIDs.length);
+
+ final long expectedId = checkpointIDs[i++];
+ if (expectedId >= 0) {
+ assertEquals("wrong checkpoint id", expectedId, checkpointMetaData.getCheckpointId());
+ assertTrue(checkpointMetaData.getTimestamp() > 0);
+ } else {
+ fail(String.format(
+ "got 'triggerCheckpointOnBarrier(%d)' when expecting an 'abortCheckpointOnBarrier(%d)'",
+ checkpointMetaData.getCheckpointId(),
+ expectedId));
+ }
+ }
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
+ assertTrue("Unexpected abortCheckpointOnBarrier(" + checkpointId + ")", i < checkpointIDs.length);
+
+ final long expectedId = checkpointIDs[i++];
+ if (expectedId < 0) {
+ assertEquals("wrong checkpoint id for checkpoint abort", -expectedId, checkpointId);
+ } else {
+ fail(String.format(
+ "got 'abortCheckpointOnBarrier(%d)' when expecting an 'triggerCheckpointOnBarrier(%d)'",
+ checkpointId,
+ expectedId));
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
+}