You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/21 17:38:06 UTC

[flink] branch master updated: Merge pull request #12244 from pnowojski/f17258

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 463dc8b  Merge pull request #12244 from pnowojski/f17258
463dc8b is described below

commit 463dc8b9cab99d6d5df07ec0e593843885e3ac72
Author: Piotr Nowojski <pn...@users.noreply.github.com>
AuthorDate: Thu May 21 19:37:14 2020 +0200

    Merge pull request #12244 from pnowojski/f17258
    
    [FLINK-17258][network] Fix couple of ITCases that were failing with enabled unaligned checkpoints
---
 .../checkpoint/channel/ChannelStateWriterImpl.java    | 19 ++++++++++---------
 .../planner/runtime/stream/sql/AggregateITCase.scala  |  2 +-
 .../flink/test/classloading/ClassLoaderITCase.java    |  4 +++-
 .../test/classloading/jar/CustomKvStateProgram.java   |  5 ++++-
 4 files changed, 18 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 3f56b15..fbb8ebc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -106,13 +106,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 			enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false);
 			return result;
 		});
-		Preconditions.checkArgument(put == result, "result future already present for checkpoint id: " + checkpointId);
+		Preconditions.checkArgument(put == result, "result future already present for checkpoint " + checkpointId);
 	}
 
 	@Override
 	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
 		LOG.debug(
-			"{} adding input data, checkpoint id: {}, channel: {}, startSeqNum: {}",
+			"{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}",
 			taskName,
 			checkpointId,
 			info,
@@ -123,7 +123,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 	@Override
 	public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
 		LOG.debug(
-			"{} adding output data, checkpoint id: {}, channel: {}, startSeqNum: {}, num buffers: {}",
+			"{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}",
 			taskName,
 			checkpointId,
 			info,
@@ -134,19 +134,19 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public void finishInput(long checkpointId) {
-		LOG.debug("{} finishing input data, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
 		enqueue(completeInput(checkpointId), false);
 	}
 
 	@Override
 	public void finishOutput(long checkpointId) {
-		LOG.debug("{} finishing output data, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} finishing output data, checkpoint {}", taskName, checkpointId);
 		enqueue(completeOutput(checkpointId), false);
 	}
 
 	@Override
 	public void abort(long checkpointId, Throwable cause) {
-		LOG.debug("{} aborting, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
 		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started
 		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started
 		results.remove(checkpointId);
@@ -154,15 +154,15 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public ChannelStateWriteResult getWriteResult(long checkpointId) {
-		LOG.debug("{} requested write result, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
 		ChannelStateWriteResult result = results.get(checkpointId);
-		Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint id " + checkpointId);
+		Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint " + checkpointId);
 		return result;
 	}
 
 	@Override
 	public void stop(long checkpointId) {
-		LOG.debug("{} stopping checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
 		results.remove(checkpointId);
 	}
 
@@ -172,6 +172,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public void close() throws IOException {
+		LOG.debug("close, dropping checkpoints {}", results.keySet());
 		results.clear();
 		executor.close();
 	}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 73b5801..1a9485d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1210,7 +1210,7 @@ class AggregateITCase(
     t1.toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
     val expected = List("1", "3")
-    assertEquals(expected, sink.getRetractResults)
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
   @Test
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 6ea054a..974512c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -313,7 +313,9 @@ public class ClassLoaderITCase extends TestLogger {
 				String.valueOf(parallelism),
 				checkpointDir.toURI().toString(),
 				"5000",
-				outputDir.toURI().toString()})
+				outputDir.toURI().toString(),
+				"false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
+			})
 			.build();
 
 		TestStreamEnvironment.setAsContext(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index 954b8df..d6f4aa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.InfiniteIntegerSource;
 import org.apache.flink.util.Collector;
 
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -47,10 +48,12 @@ public class CustomKvStateProgram {
 		final String checkpointPath = args[1];
 		final int checkpointingInterval = Integer.parseInt(args[2]);
 		final String outputPath = args[3];
+		final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
-				env.enableCheckpointing(checkpointingInterval);
+		env.enableCheckpointing(checkpointingInterval);
+		unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value));
 		env.setStateBackend(new FsStateBackend(checkpointPath));
 
 		DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());