You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:14 UTC

[flink] 05/16: [hotfix][network] Do not abort the same checkpoint barrier twice when cancellation marker was lost

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 62927c90c5a8aa79dfc68b6bb4f05f4aeebcd2b4
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sat Jun 15 10:50:24 2019 +0200

    [hotfix][network] Do not abort the same checkpoint barrier twice when cancellation marker was lost
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 12 +--
 .../runtime/io/BarrierBufferTestBase.java          | 19 +++++
 .../streaming/runtime/io/BarrierTrackerTest.java   | 55 -------------
 .../runtime/io/CheckpointSequenceValidator.java    | 90 ++++++++++++++++++++++
 4 files changed, 112 insertions(+), 64 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 23717f5..0f8fa40 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -330,12 +330,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				startOfAlignmentTimestamp = 0L;
 				latestAlignmentDurationNanos = 0L;
 
-				notifyAbort(currentCheckpointId,
-					new CheckpointException(
-						"Barrier id: " + barrierId,
-						CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED
-					));
-
 				notifyAbortOnCancellationBarrier(barrierId);
 			}
 
@@ -380,11 +374,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
 		if (toNotifyOnCheckpoint != null) {
 			CheckpointMetaData checkpointMetaData =
-					new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+				new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
 
 			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
-					.setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
-					.setAlignmentDurationNanos(latestAlignmentDurationNanos);
+				.setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
+				.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
 			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
 				checkpointMetaData,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index 3b4f65f..e6e48a8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -689,6 +689,25 @@ public abstract class BarrierBufferTestBase {
 	}
 
 	@Test
+	public void testMissingCancellationBarriers() throws Exception {
+		BufferOrEvent[] sequence = {
+			createBarrier(1L, 0),
+			createCancellationBarrier(2L, 0),
+			createCancellationBarrier(3L, 0),
+			createCancellationBarrier(3L, 1),
+			createBuffer(0)
+		};
+		AbstractInvokable validator = new CheckpointSequenceValidator(-3);
+		buffer = createBarrierBuffer(2, sequence, validator);
+
+		for (BufferOrEvent boe : sequence) {
+			if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) {
+				assertEquals(boe, buffer.pollNext().get());
+			}
+		}
+	}
+
+	@Test
 	public void testEarlyCleanup() throws Exception {
 		BufferOrEvent[] sequence = {
 			createBuffer(0), createBuffer(1), createBuffer(2),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index cb58837..1be2aab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,8 +19,6 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -28,7 +26,6 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 
 import org.junit.After;
 import org.junit.Test;
@@ -40,7 +37,6 @@ import java.util.Arrays;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -404,55 +400,4 @@ public class BarrierTrackerTest {
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 
-	private static class CheckpointSequenceValidator extends AbstractInvokable {
-
-		private final long[] checkpointIDs;
-
-		private int i = 0;
-
-		private CheckpointSequenceValidator(long... checkpointIDs) {
-			super(new DummyEnvironment("test", 1, 0));
-			this.checkpointIDs = checkpointIDs;
-		}
-
-		@Override
-		public void invoke() {
-			throw new UnsupportedOperationException("should never be called");
-		}
-
-		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
-			throw new UnsupportedOperationException("should never be called");
-		}
-
-		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
-			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-
-			final long expectedId = checkpointIDs[i++];
-			if (expectedId >= 0) {
-				assertEquals("wrong checkpoint id", expectedId, checkpointMetaData.getCheckpointId());
-				assertTrue(checkpointMetaData.getTimestamp() > 0);
-			} else {
-				fail("got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'");
-			}
-		}
-
-		@Override
-		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
-			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-
-			final long expectedId = checkpointIDs[i++];
-			if (expectedId < 0) {
-				assertEquals("wrong checkpoint id for checkpoint abort", -expectedId, checkpointId);
-			} else {
-				fail("got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'");
-			}
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) throws Exception {
-			throw new UnsupportedOperationException("should never be called");
-		}
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
new file mode 100644
index 0000000..83b5364
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@link AbstractInvokable} that validates expected order of completed and aborted checkpoints.
+ */
+class CheckpointSequenceValidator extends AbstractInvokable {
+
+	private final long[] checkpointIDs;
+
+	private int i = 0;
+
+	CheckpointSequenceValidator(long... checkpointIDs) {
+		super(new DummyEnvironment("test", 1, 0));
+		this.checkpointIDs = checkpointIDs;
+	}
+
+	@Override
+	public void invoke() {
+		throw new UnsupportedOperationException("should never be called");
+	}
+
+	@Override
+	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
+		throw new UnsupportedOperationException("should never be called");
+	}
+
+	@Override
+	public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
+		assertTrue("Unexpected triggerCheckpointOnBarrier(" + checkpointMetaData.getCheckpointId() + ")", i < checkpointIDs.length);
+
+		final long expectedId = checkpointIDs[i++];
+		if (expectedId >= 0) {
+			assertEquals("wrong checkpoint id", expectedId, checkpointMetaData.getCheckpointId());
+			assertTrue(checkpointMetaData.getTimestamp() > 0);
+		} else {
+			fail(String.format(
+				"got 'triggerCheckpointOnBarrier(%d)' when expecting an 'abortCheckpointOnBarrier(%d)'",
+				checkpointMetaData.getCheckpointId(),
+				expectedId));
+		}
+	}
+
+	@Override
+	public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
+		assertTrue("Unexpected abortCheckpointOnBarrier(" + checkpointId + ")", i < checkpointIDs.length);
+
+		final long expectedId = checkpointIDs[i++];
+		if (expectedId < 0) {
+			assertEquals("wrong checkpoint id for checkpoint abort", -expectedId, checkpointId);
+		} else {
+			fail(String.format(
+				"got 'abortCheckpointOnBarrier(%d)' when expecting an 'triggerCheckpointOnBarrier(%d)'",
+				checkpointId,
+				expectedId));
+		}
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		throw new UnsupportedOperationException("should never be called");
+	}
+}