You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/21 17:39:36 UTC
[flink] 01/04: Revert "Merge pull request #12244 from
pnowojski/f17258"
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 840a4d954233f8769a54ac2ada5d34a5ef80c82d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu May 21 19:38:35 2020 +0200
Revert "Merge pull request #12244 from pnowojski/f17258"
This reverts commit 463dc8b9cab99d6d5df07ec0e593843885e3ac72.
---
.../checkpoint/channel/ChannelStateWriterImpl.java | 19 +++++++++----------
.../planner/runtime/stream/sql/AggregateITCase.scala | 2 +-
.../flink/test/classloading/ClassLoaderITCase.java | 4 +---
.../test/classloading/jar/CustomKvStateProgram.java | 5 +----
4 files changed, 12 insertions(+), 18 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index fbb8ebc..3f56b15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -106,13 +106,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false);
return result;
});
- Preconditions.checkArgument(put == result, "result future already present for checkpoint " + checkpointId);
+ Preconditions.checkArgument(put == result, "result future already present for checkpoint id: " + checkpointId);
}
@Override
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
LOG.debug(
- "{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}",
+ "{} adding input data, checkpoint id: {}, channel: {}, startSeqNum: {}",
taskName,
checkpointId,
info,
@@ -123,7 +123,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
@Override
public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
LOG.debug(
- "{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}",
+ "{} adding output data, checkpoint id: {}, channel: {}, startSeqNum: {}, num buffers: {}",
taskName,
checkpointId,
info,
@@ -134,19 +134,19 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
@Override
public void finishInput(long checkpointId) {
- LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
+ LOG.debug("{} finishing input data, checkpoint id: {}", taskName, checkpointId);
enqueue(completeInput(checkpointId), false);
}
@Override
public void finishOutput(long checkpointId) {
- LOG.debug("{} finishing output data, checkpoint {}", taskName, checkpointId);
+ LOG.debug("{} finishing output data, checkpoint id: {}", taskName, checkpointId);
enqueue(completeOutput(checkpointId), false);
}
@Override
public void abort(long checkpointId, Throwable cause) {
- LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
+ LOG.debug("{} aborting, checkpoint id: {}", taskName, checkpointId);
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started
results.remove(checkpointId);
@@ -154,15 +154,15 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
@Override
public ChannelStateWriteResult getWriteResult(long checkpointId) {
- LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
+ LOG.debug("{} requested write result, checkpoint id: {}", taskName, checkpointId);
ChannelStateWriteResult result = results.get(checkpointId);
- Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint " + checkpointId);
+ Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint id " + checkpointId);
return result;
}
@Override
public void stop(long checkpointId) {
- LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
+ LOG.debug("{} stopping checkpoint id: {}", taskName, checkpointId);
results.remove(checkpointId);
}
@@ -172,7 +172,6 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
@Override
public void close() throws IOException {
- LOG.debug("close, dropping checkpoints {}", results.keySet());
results.clear();
executor.close();
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 1a9485d..73b5801 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1210,7 +1210,7 @@ class AggregateITCase(
t1.toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
val expected = List("1", "3")
- assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ assertEquals(expected, sink.getRetractResults)
}
@Test
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 974512c..6ea054a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -313,9 +313,7 @@ public class ClassLoaderITCase extends TestLogger {
String.valueOf(parallelism),
checkpointDir.toURI().toString(),
"5000",
- outputDir.toURI().toString(),
- "false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
- })
+ outputDir.toURI().toString()})
.build();
TestStreamEnvironment.setAsContext(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index d6f4aa1..954b8df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.InfiniteIntegerSource;
import org.apache.flink.util.Collector;
-import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -48,12 +47,10 @@ public class CustomKvStateProgram {
final String checkpointPath = args[1];
final int checkpointingInterval = Integer.parseInt(args[2]);
final String outputPath = args[3];
- final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
- env.enableCheckpointing(checkpointingInterval);
- unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value));
+ env.enableCheckpointing(checkpointingInterval);
env.setStateBackend(new FsStateBackend(checkpointPath));
DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());