You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/21 17:39:35 UTC

[flink] branch master updated (463dc8b -> 303e008)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 463dc8b  Merge pull request #12244 from pnowojski/f17258
     new 840a4d9  Revert "Merge pull request #12244 from pnowojski/f17258"
     new b3a7851  [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints
     new 05aa2d2  [FLINK-17258][sql][test] Fix AggregateITCase.testPruneUselessAggCall missing sorted
     new 303e008  [FLINK-17258][hotfix][network] Unify checkpoint id logging with CheckpointCoordinator

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[flink] 03/04: [FLINK-17258][sql][test] Fix AggregateITCase.testPruneUselessAggCall missing sorted

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 05aa2d26d2dce3e6090ca59443f969afd28625cc
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sat May 16 15:00:19 2020 +0200

    [FLINK-17258][sql][test] Fix AggregateITCase.testPruneUselessAggCall missing sorted
    
    The result of this query doesn't guarantee any ordering. This brakes with enabled unaligned checkpoints.
---
 .../apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 73b5801..1a9485d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1210,7 +1210,7 @@ class AggregateITCase(
     t1.toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
     val expected = List("1", "3")
-    assertEquals(expected, sink.getRetractResults)
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
   @Test


[flink] 01/04: Revert "Merge pull request #12244 from pnowojski/f17258"

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 840a4d954233f8769a54ac2ada5d34a5ef80c82d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu May 21 19:38:35 2020 +0200

    Revert "Merge pull request #12244 from pnowojski/f17258"
    
    This reverts commit 463dc8b9cab99d6d5df07ec0e593843885e3ac72.
---
 .../checkpoint/channel/ChannelStateWriterImpl.java    | 19 +++++++++----------
 .../planner/runtime/stream/sql/AggregateITCase.scala  |  2 +-
 .../flink/test/classloading/ClassLoaderITCase.java    |  4 +---
 .../test/classloading/jar/CustomKvStateProgram.java   |  5 +----
 4 files changed, 12 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index fbb8ebc..3f56b15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -106,13 +106,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 			enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false);
 			return result;
 		});
-		Preconditions.checkArgument(put == result, "result future already present for checkpoint " + checkpointId);
+		Preconditions.checkArgument(put == result, "result future already present for checkpoint id: " + checkpointId);
 	}
 
 	@Override
 	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
 		LOG.debug(
-			"{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}",
+			"{} adding input data, checkpoint id: {}, channel: {}, startSeqNum: {}",
 			taskName,
 			checkpointId,
 			info,
@@ -123,7 +123,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 	@Override
 	public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
 		LOG.debug(
-			"{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}",
+			"{} adding output data, checkpoint id: {}, channel: {}, startSeqNum: {}, num buffers: {}",
 			taskName,
 			checkpointId,
 			info,
@@ -134,19 +134,19 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public void finishInput(long checkpointId) {
-		LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
+		LOG.debug("{} finishing input data, checkpoint id: {}", taskName, checkpointId);
 		enqueue(completeInput(checkpointId), false);
 	}
 
 	@Override
 	public void finishOutput(long checkpointId) {
-		LOG.debug("{} finishing output data, checkpoint {}", taskName, checkpointId);
+		LOG.debug("{} finishing output data, checkpoint id: {}", taskName, checkpointId);
 		enqueue(completeOutput(checkpointId), false);
 	}
 
 	@Override
 	public void abort(long checkpointId, Throwable cause) {
-		LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
+		LOG.debug("{} aborting, checkpoint id: {}", taskName, checkpointId);
 		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started
 		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started
 		results.remove(checkpointId);
@@ -154,15 +154,15 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public ChannelStateWriteResult getWriteResult(long checkpointId) {
-		LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
+		LOG.debug("{} requested write result, checkpoint id: {}", taskName, checkpointId);
 		ChannelStateWriteResult result = results.get(checkpointId);
-		Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint " + checkpointId);
+		Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint id " + checkpointId);
 		return result;
 	}
 
 	@Override
 	public void stop(long checkpointId) {
-		LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
+		LOG.debug("{} stopping checkpoint id: {}", taskName, checkpointId);
 		results.remove(checkpointId);
 	}
 
@@ -172,7 +172,6 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public void close() throws IOException {
-		LOG.debug("close, dropping checkpoints {}", results.keySet());
 		results.clear();
 		executor.close();
 	}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 1a9485d..73b5801 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1210,7 +1210,7 @@ class AggregateITCase(
     t1.toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
     val expected = List("1", "3")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+    assertEquals(expected, sink.getRetractResults)
   }
 
   @Test
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 974512c..6ea054a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -313,9 +313,7 @@ public class ClassLoaderITCase extends TestLogger {
 				String.valueOf(parallelism),
 				checkpointDir.toURI().toString(),
 				"5000",
-				outputDir.toURI().toString(),
-				"false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
-			})
+				outputDir.toURI().toString()})
 			.build();
 
 		TestStreamEnvironment.setAsContext(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index d6f4aa1..954b8df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.InfiniteIntegerSource;
 import org.apache.flink.util.Collector;
 
-import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -48,12 +47,10 @@ public class CustomKvStateProgram {
 		final String checkpointPath = args[1];
 		final int checkpointingInterval = Integer.parseInt(args[2]);
 		final String outputPath = args[3];
-		final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
-		env.enableCheckpointing(checkpointingInterval);
-		unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value));
+				env.enableCheckpointing(checkpointingInterval);
 		env.setStateBackend(new FsStateBackend(checkpointPath));
 
 		DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());


[flink] 04/04: [FLINK-17258][hotfix][network] Unify checkpoint id logging with CheckpointCoordinator

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 303e00800ea90a04e1471122bb8ca19ee38d94b8
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sun May 17 16:15:01 2020 +0200

    [FLINK-17258][hotfix][network] Unify checkpoint id logging with CheckpointCoordinator
---
 .../checkpoint/channel/ChannelStateWriterImpl.java    | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 3f56b15..fbb8ebc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -106,13 +106,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 			enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false);
 			return result;
 		});
-		Preconditions.checkArgument(put == result, "result future already present for checkpoint id: " + checkpointId);
+		Preconditions.checkArgument(put == result, "result future already present for checkpoint " + checkpointId);
 	}
 
 	@Override
 	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
 		LOG.debug(
-			"{} adding input data, checkpoint id: {}, channel: {}, startSeqNum: {}",
+			"{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}",
 			taskName,
 			checkpointId,
 			info,
@@ -123,7 +123,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 	@Override
 	public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
 		LOG.debug(
-			"{} adding output data, checkpoint id: {}, channel: {}, startSeqNum: {}, num buffers: {}",
+			"{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}",
 			taskName,
 			checkpointId,
 			info,
@@ -134,19 +134,19 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public void finishInput(long checkpointId) {
-		LOG.debug("{} finishing input data, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
 		enqueue(completeInput(checkpointId), false);
 	}
 
 	@Override
 	public void finishOutput(long checkpointId) {
-		LOG.debug("{} finishing output data, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} finishing output data, checkpoint {}", taskName, checkpointId);
 		enqueue(completeOutput(checkpointId), false);
 	}
 
 	@Override
 	public void abort(long checkpointId, Throwable cause) {
-		LOG.debug("{} aborting, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
 		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started
 		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started
 		results.remove(checkpointId);
@@ -154,15 +154,15 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public ChannelStateWriteResult getWriteResult(long checkpointId) {
-		LOG.debug("{} requested write result, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
 		ChannelStateWriteResult result = results.get(checkpointId);
-		Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint id " + checkpointId);
+		Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint " + checkpointId);
 		return result;
 	}
 
 	@Override
 	public void stop(long checkpointId) {
-		LOG.debug("{} stopping checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
 		results.remove(checkpointId);
 	}
 
@@ -172,6 +172,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public void close() throws IOException {
+		LOG.debug("close, dropping checkpoints {}", results.keySet());
 		results.clear();
 		executor.close();
 	}


[flink] 02/04: [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3a78515870d3a9b461130b6cfc92e72412ed209
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu May 14 20:34:22 2020 +0200

    [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints
    
    This test needs many concurent checkpoint & savepoints and this is currently not supported with unaligned checkpoints.
---
 .../java/org/apache/flink/test/classloading/ClassLoaderITCase.java   | 4 +++-
 .../org/apache/flink/test/classloading/jar/CustomKvStateProgram.java | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 6ea054a..974512c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -313,7 +313,9 @@ public class ClassLoaderITCase extends TestLogger {
 				String.valueOf(parallelism),
 				checkpointDir.toURI().toString(),
 				"5000",
-				outputDir.toURI().toString()})
+				outputDir.toURI().toString(),
+				"false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
+			})
 			.build();
 
 		TestStreamEnvironment.setAsContext(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index 954b8df..d6f4aa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.InfiniteIntegerSource;
 import org.apache.flink.util.Collector;
 
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -47,10 +48,12 @@ public class CustomKvStateProgram {
 		final String checkpointPath = args[1];
 		final int checkpointingInterval = Integer.parseInt(args[2]);
 		final String outputPath = args[3];
+		final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
-				env.enableCheckpointing(checkpointingInterval);
+		env.enableCheckpointing(checkpointingInterval);
+		unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value));
 		env.setStateBackend(new FsStateBackend(checkpointPath));
 
 		DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());