You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/21 17:41:10 UTC

[flink] branch release-1.11 updated (6a4714f -> 8062469)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6a4714f  [FLINK-17794][tests] Tear down installed software in reverse order
     new 2da9ac4  [FLINK-17842][network] Fix performance regression in SpanningWrapper#clear
     new 4492ac5  [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints
     new bb64296  [FLINK-17258][sql][test] Fix AggregateITCase.testPruneUselessAggCall missing sorted
     new 8062469  [FLINK-17258][hotfix][network] Unify checkpoint id logging with CheckpointCoordinator

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../checkpoint/channel/ChannelStateWriterImpl.java    | 19 ++++++++++---------
 .../io/network/api/serialization/SpanningWrapper.java | 14 ++++++++++++--
 .../planner/runtime/stream/sql/AggregateITCase.scala  |  2 +-
 .../flink/test/classloading/ClassLoaderITCase.java    |  4 +++-
 .../test/classloading/jar/CustomKvStateProgram.java   |  5 ++++-
 5 files changed, 30 insertions(+), 14 deletions(-)


[flink] 02/04: [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4492ac53af74a6a591f6b87a869742df11e7e576
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu May 14 20:34:22 2020 +0200

    [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints
    
    This test needs many concurent checkpoint & savepoints and this is currently not supported with unaligned checkpoints.
---
 .../java/org/apache/flink/test/classloading/ClassLoaderITCase.java   | 4 +++-
 .../org/apache/flink/test/classloading/jar/CustomKvStateProgram.java | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 6ea054a..974512c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -313,7 +313,9 @@ public class ClassLoaderITCase extends TestLogger {
 				String.valueOf(parallelism),
 				checkpointDir.toURI().toString(),
 				"5000",
-				outputDir.toURI().toString()})
+				outputDir.toURI().toString(),
+				"false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
+			})
 			.build();
 
 		TestStreamEnvironment.setAsContext(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index 954b8df..d6f4aa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.InfiniteIntegerSource;
 import org.apache.flink.util.Collector;
 
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -47,10 +48,12 @@ public class CustomKvStateProgram {
 		final String checkpointPath = args[1];
 		final int checkpointingInterval = Integer.parseInt(args[2]);
 		final String outputPath = args[3];
+		final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
-				env.enableCheckpointing(checkpointingInterval);
+		env.enableCheckpointing(checkpointingInterval);
+		unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value));
 		env.setStateBackend(new FsStateBackend(checkpointPath));
 
 		DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());


[flink] 04/04: [FLINK-17258][hotfix][network] Unify checkpoint id logging with CheckpointCoordinator

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8062469f2b4d2d6b82df11514208a1cff37badca
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sun May 17 16:15:01 2020 +0200

    [FLINK-17258][hotfix][network] Unify checkpoint id logging with CheckpointCoordinator
---
 .../checkpoint/channel/ChannelStateWriterImpl.java    | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 3f56b15..fbb8ebc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -106,13 +106,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 			enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false);
 			return result;
 		});
-		Preconditions.checkArgument(put == result, "result future already present for checkpoint id: " + checkpointId);
+		Preconditions.checkArgument(put == result, "result future already present for checkpoint " + checkpointId);
 	}
 
 	@Override
 	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
 		LOG.debug(
-			"{} adding input data, checkpoint id: {}, channel: {}, startSeqNum: {}",
+			"{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}",
 			taskName,
 			checkpointId,
 			info,
@@ -123,7 +123,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 	@Override
 	public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
 		LOG.debug(
-			"{} adding output data, checkpoint id: {}, channel: {}, startSeqNum: {}, num buffers: {}",
+			"{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}",
 			taskName,
 			checkpointId,
 			info,
@@ -134,19 +134,19 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public void finishInput(long checkpointId) {
-		LOG.debug("{} finishing input data, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
 		enqueue(completeInput(checkpointId), false);
 	}
 
 	@Override
 	public void finishOutput(long checkpointId) {
-		LOG.debug("{} finishing output data, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} finishing output data, checkpoint {}", taskName, checkpointId);
 		enqueue(completeOutput(checkpointId), false);
 	}
 
 	@Override
 	public void abort(long checkpointId, Throwable cause) {
-		LOG.debug("{} aborting, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
 		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started
 		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started
 		results.remove(checkpointId);
@@ -154,15 +154,15 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public ChannelStateWriteResult getWriteResult(long checkpointId) {
-		LOG.debug("{} requested write result, checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
 		ChannelStateWriteResult result = results.get(checkpointId);
-		Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint id " + checkpointId);
+		Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint " + checkpointId);
 		return result;
 	}
 
 	@Override
 	public void stop(long checkpointId) {
-		LOG.debug("{} stopping checkpoint id: {}", taskName, checkpointId);
+		LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
 		results.remove(checkpointId);
 	}
 
@@ -172,6 +172,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 
 	@Override
 	public void close() throws IOException {
+		LOG.debug("close, dropping checkpoints {}", results.keySet());
 		results.clear();
 		executor.close();
 	}


[flink] 03/04: [FLINK-17258][sql][test] Fix AggregateITCase.testPruneUselessAggCall missing sorted

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb64296c7a28ac2e51a741082b0d9c6de037de69
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sat May 16 15:00:19 2020 +0200

    [FLINK-17258][sql][test] Fix AggregateITCase.testPruneUselessAggCall missing sorted
    
    The result of this query doesn't guarantee any ordering. This brakes with enabled unaligned checkpoints.
---
 .../apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 73b5801..1a9485d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1210,7 +1210,7 @@ class AggregateITCase(
     t1.toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
     val expected = List("1", "3")
-    assertEquals(expected, sink.getRetractResults)
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
   @Test


[flink] 01/04: [FLINK-17842][network] Fix performance regression in SpanningWrapper#clear

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2da9ac4b2108ec13ac97a8aff90c43449b2efb4e
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed May 20 15:17:24 2020 +0200

    [FLINK-17842][network] Fix performance regression in SpanningWrapper#clear
    
    For some reason the following commit:
    54155744bd [FLINK-17547][task] Use RefCountedFile in SpanningWrapper
    
    caused a performance regression in various benchmarks. It's hard to tell why
    as none of the benchmarks are using spill files (records are too small), so
    our best guess is that combination of AtomicInteger inside RefCountedFile
    plus NullPointerException handling messed up with JIT ability to get rid
    of the memory barrier (from AtomicInteger) on the hot path.
---
 .../io/network/api/serialization/SpanningWrapper.java      | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 45d6ad7..7e3a2da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -50,7 +50,7 @@ import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningW
 import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
 import static org.apache.flink.util.CloseableIterator.empty;
 import static org.apache.flink.util.FileUtils.writeCompletely;
-import static org.apache.flink.util.IOUtils.closeAllQuietly;
+import static org.apache.flink.util.IOUtils.closeQuietly;
 
 final class SpanningWrapper {
 
@@ -249,7 +249,17 @@ final class SpanningWrapper {
 		leftOverLimit = 0;
 		accumulatedRecordBytes = 0;
 
-		closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.release());
+		if (spillingChannel != null) {
+			closeQuietly(spillingChannel);
+		}
+		if (spillFileReader != null) {
+			closeQuietly(spillFileReader);
+		}
+		if (spillFile != null) {
+			// It's important to avoid AtomicInteger access inside `release()` on the hot path
+			closeQuietly(() -> spillFile.release());
+		}
+
 		spillingChannel = null;
 		spillFileReader = null;
 		spillFile = null;