You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/21 17:41:10 UTC
[flink] branch release-1.11 updated (6a4714f -> 8062469)
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.
from 6a4714f [FLINK-17794][tests] Tear down installed software in reverse order
new 2da9ac4 [FLINK-17842][network] Fix performance regression in SpanningWrapper#clear
new 4492ac5 [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints
new bb64296 [FLINK-17258][sql][test] Fix AggregateITCase.testPruneUselessAggCall missing sorted
new 8062469 [FLINK-17258][hotfix][network] Unify checkpoint id logging with CheckpointCoordinator
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../checkpoint/channel/ChannelStateWriterImpl.java | 19 ++++++++++---------
.../io/network/api/serialization/SpanningWrapper.java | 14 ++++++++++++--
.../planner/runtime/stream/sql/AggregateITCase.scala | 2 +-
.../flink/test/classloading/ClassLoaderITCase.java | 4 +++-
.../test/classloading/jar/CustomKvStateProgram.java | 5 ++++-
5 files changed, 30 insertions(+), 14 deletions(-)
[flink] 02/04: [FLINK-17258][network][test] Run
ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned
checkpoints
Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4492ac53af74a6a591f6b87a869742df11e7e576
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu May 14 20:34:22 2020 +0200
[FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints
This test needs many concurent checkpoint & savepoints and this is currently not supported with unaligned checkpoints.
---
.../java/org/apache/flink/test/classloading/ClassLoaderITCase.java | 4 +++-
.../org/apache/flink/test/classloading/jar/CustomKvStateProgram.java | 5 ++++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 6ea054a..974512c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -313,7 +313,9 @@ public class ClassLoaderITCase extends TestLogger {
String.valueOf(parallelism),
checkpointDir.toURI().toString(),
"5000",
- outputDir.toURI().toString()})
+ outputDir.toURI().toString(),
+ "false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
+ })
.build();
TestStreamEnvironment.setAsContext(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index 954b8df..d6f4aa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.InfiniteIntegerSource;
import org.apache.flink.util.Collector;
+import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -47,10 +48,12 @@ public class CustomKvStateProgram {
final String checkpointPath = args[1];
final int checkpointingInterval = Integer.parseInt(args[2]);
final String outputPath = args[3];
+ final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
- env.enableCheckpointing(checkpointingInterval);
+ env.enableCheckpointing(checkpointingInterval);
+ unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value));
env.setStateBackend(new FsStateBackend(checkpointPath));
DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());
[flink] 04/04: [FLINK-17258][hotfix][network] Unify checkpoint id
logging with CheckpointCoordinator
Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8062469f2b4d2d6b82df11514208a1cff37badca
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sun May 17 16:15:01 2020 +0200
[FLINK-17258][hotfix][network] Unify checkpoint id logging with CheckpointCoordinator
---
.../checkpoint/channel/ChannelStateWriterImpl.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 3f56b15..fbb8ebc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -106,13 +106,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false);
return result;
});
- Preconditions.checkArgument(put == result, "result future already present for checkpoint id: " + checkpointId);
+ Preconditions.checkArgument(put == result, "result future already present for checkpoint " + checkpointId);
}
@Override
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
LOG.debug(
- "{} adding input data, checkpoint id: {}, channel: {}, startSeqNum: {}",
+ "{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}",
taskName,
checkpointId,
info,
@@ -123,7 +123,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
@Override
public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
LOG.debug(
- "{} adding output data, checkpoint id: {}, channel: {}, startSeqNum: {}, num buffers: {}",
+ "{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}",
taskName,
checkpointId,
info,
@@ -134,19 +134,19 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
@Override
public void finishInput(long checkpointId) {
- LOG.debug("{} finishing input data, checkpoint id: {}", taskName, checkpointId);
+ LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
enqueue(completeInput(checkpointId), false);
}
@Override
public void finishOutput(long checkpointId) {
- LOG.debug("{} finishing output data, checkpoint id: {}", taskName, checkpointId);
+ LOG.debug("{} finishing output data, checkpoint {}", taskName, checkpointId);
enqueue(completeOutput(checkpointId), false);
}
@Override
public void abort(long checkpointId, Throwable cause) {
- LOG.debug("{} aborting, checkpoint id: {}", taskName, checkpointId);
+ LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started
results.remove(checkpointId);
@@ -154,15 +154,15 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
@Override
public ChannelStateWriteResult getWriteResult(long checkpointId) {
- LOG.debug("{} requested write result, checkpoint id: {}", taskName, checkpointId);
+ LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
ChannelStateWriteResult result = results.get(checkpointId);
- Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint id " + checkpointId);
+ Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint " + checkpointId);
return result;
}
@Override
public void stop(long checkpointId) {
- LOG.debug("{} stopping checkpoint id: {}", taskName, checkpointId);
+ LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
results.remove(checkpointId);
}
@@ -172,6 +172,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
@Override
public void close() throws IOException {
+ LOG.debug("close, dropping checkpoints {}", results.keySet());
results.clear();
executor.close();
}
[flink] 03/04: [FLINK-17258][sql][test] Fix
AggregateITCase.testPruneUselessAggCall missing sorted
Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bb64296c7a28ac2e51a741082b0d9c6de037de69
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sat May 16 15:00:19 2020 +0200
[FLINK-17258][sql][test] Fix AggregateITCase.testPruneUselessAggCall missing sorted
The result of this query doesn't guarantee any ordering. This brakes with enabled unaligned checkpoints.
---
.../apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 73b5801..1a9485d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1210,7 +1210,7 @@ class AggregateITCase(
t1.toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
val expected = List("1", "3")
- assertEquals(expected, sink.getRetractResults)
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
@Test
[flink] 01/04: [FLINK-17842][network] Fix performance regression in
SpanningWrapper#clear
Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2da9ac4b2108ec13ac97a8aff90c43449b2efb4e
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed May 20 15:17:24 2020 +0200
[FLINK-17842][network] Fix performance regression in SpanningWrapper#clear
For some reason the following commit:
54155744bd [FLINK-17547][task] Use RefCountedFile in SpanningWrapper
caused a performance regression in various benchmarks. It's hard to tell why
as none of the benchmarks are using spill files (records are too small), so
our best guess is that combination of AtomicInteger inside RefCountedFile
plus NullPointerException handling messed up with JIT ability to get rid
of the memory barrier (from AtomicInteger) on the hot path.
---
.../io/network/api/serialization/SpanningWrapper.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 45d6ad7..7e3a2da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -50,7 +50,7 @@ import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningW
import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
import static org.apache.flink.util.CloseableIterator.empty;
import static org.apache.flink.util.FileUtils.writeCompletely;
-import static org.apache.flink.util.IOUtils.closeAllQuietly;
+import static org.apache.flink.util.IOUtils.closeQuietly;
final class SpanningWrapper {
@@ -249,7 +249,17 @@ final class SpanningWrapper {
leftOverLimit = 0;
accumulatedRecordBytes = 0;
- closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.release());
+ if (spillingChannel != null) {
+ closeQuietly(spillingChannel);
+ }
+ if (spillFileReader != null) {
+ closeQuietly(spillFileReader);
+ }
+ if (spillFile != null) {
+ // It's important to avoid AtomicInteger access inside `release()` on the hot path
+ closeQuietly(() -> spillFile.release());
+ }
+
spillingChannel = null;
spillFileReader = null;
spillFile = null;