You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/19 21:10:02 UTC
[1/5] flink git commit: [hotfix][rat] Add exclusion for all test
snapshots/savepoints
Repository: flink
Updated Branches:
refs/heads/master 8e3213678 -> 65fdadac8
[hotfix][rat] Add exclusion for all test snapshots/savepoints
This closes #3854.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b485307
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b485307
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b485307
Branch: refs/heads/master
Commit: 4b485307800d04af460fec29f6f2b34b1ff189b1
Parents: 8e32136
Author: zentol <ch...@apache.org>
Authored: Wed May 10 16:50:18 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:33 2017 +0200
----------------------------------------------------------------------
...inkKafkaConsumerBaseFrom11MigrationTest.java | 6 +++---
...inkKafkaConsumerBaseFrom12MigrationTest.java | 8 ++++----
...migration-test-flink1.1-empty-state-snapshot | Bin 0 -> 468 bytes
...migration-test-flink1.1-snapshot-empty-state | Bin 468 -> 0 bytes
...migration-test-flink1.2-empty-state-snapshot | Bin 0 -> 240 bytes
...migration-test-flink1.2-snapshot-empty-state | Bin 240 -> 0 bytes
.../FlinkKinesisConsumerMigrationTest.java | 2 +-
...sumer-migration-test-flink1.1-empty-snapshot | Bin 0 -> 468 bytes
...sumer-migration-test-flink1.1-snapshot-empty | Bin 468 -> 0 bytes
.../cep/operator/CEPMigration11to13Test.java | 10 +++++-----
.../test/resources/cep-branching-1_2-snapshot | Bin 0 -> 6736 bytes
.../test/resources/cep-branching-snapshot-1.2 | Bin 6736 -> 0 bytes
.../src/test/resources/cep-keyed-1_1-snapshot | Bin 0 -> 5612 bytes
.../src/test/resources/cep-keyed-snapshot-1.1 | Bin 5612 -> 0 bytes
.../test/resources/cep-non-keyed-1.1-snapshot | Bin 0 -> 3274 bytes
.../test/resources/cep-non-keyed-snapshot-1.1 | Bin 3274 -> 0 bytes
.../resources/cep-single-pattern-1.2-snapshot | Bin 0 -> 3311 bytes
.../resources/cep-single-pattern-snapshot-1.2 | Bin 3311 -> 0 bytes
.../test/resources/cep-starting-1.2-snapshot | Bin 0 -> 6526 bytes
.../test/resources/cep-starting-snapshot-1.2 | Bin 6526 -> 0 bytes
...atefulJobSavepointFrom11MigrationITCase.java | 4 ++--
...atefulJobSavepointFrom12MigrationITCase.java | 4 ++--
...-migration-itcase-flink1.1-rocksdb-savepoint | Bin 0 -> 22283 bytes
...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 22283 -> 0 bytes
...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 25256 bytes
...-migration-itcase-flink1.2-savepoint-rocksdb | Bin 25256 -> 0 bytes
pom.xml | 19 ++-----------------
27 files changed, 19 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
index 7cc1f9c..c07ebd5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
@@ -67,7 +67,7 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
testHarness.setup();
// restore state from binary snapshot file using legacy method
testHarness.initializeStateFromLegacyCheckpoint(
- getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+ getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
testHarness.open();
// assert that no partitions were found and is empty
@@ -101,10 +101,10 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
testHarness.setup();
// restore state from binary snapshot file using legacy method
testHarness.initializeStateFromLegacyCheckpoint(
- getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+ getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
testHarness.open();
- // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot-empty-state";
+ // the expected state in "kafka-consumer-migration-test-flink1.1-empty-state-snapshot";
// since the state is empty, the consumer should reflect on the startup mode to determine start offsets.
final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("abc", 13), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
index 6414a12..f11bf9f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
@@ -76,7 +76,7 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot", PARTITION_STATE);
final HashMap<KafkaTopicPartition, Long> emptyState = new HashMap<>();
- writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state", emptyState);
+ writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot", emptyState);
}
private void writeSnapshot(String path, HashMap<KafkaTopicPartition, Long> state) throws Exception {
@@ -166,7 +166,7 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
// restore state from binary snapshot file
testHarness.initializeState(
OperatorSnapshotUtil.readStateHandle(
- OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-snapshot-empty-state")));
+ OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-empty-state-snapshot")));
testHarness.open();
// assert that no partitions were found and is empty
@@ -202,10 +202,10 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
// restore state from binary snapshot file
testHarness.initializeState(
OperatorSnapshotUtil.readStateHandle(
- OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-snapshot-empty-state")));
+ OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-empty-state-snapshot")));
testHarness.open();
- // the expected state in "kafka-consumer-migration-test-flink1.2-snapshot-empty-state";
+ // the expected state in "kafka-consumer-migration-test-flink1.2-empty-state-snapshot";
// since the state is empty, the consumer should reflect on the startup mode to determine start offsets.
final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
for (KafkaTopicPartition partition : PARTITION_STATE.keySet()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-empty-state-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-empty-state-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-empty-state-snapshot
new file mode 100644
index 0000000..f4dd96d
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-empty-state-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state
deleted file mode 100644
index f4dd96d..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot
new file mode 100644
index 0000000..45047ee
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state
deleted file mode 100644
index 45047ee..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index 7629f9d..ec9a9b5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -66,7 +66,7 @@ public class FlinkKinesisConsumerMigrationTest {
testHarness.setup();
// restore state from binary snapshot file using legacy method
testHarness.initializeStateFromLegacyCheckpoint(
- getResourceFilename("kinesis-consumer-migration-test-flink1.1-snapshot-empty"));
+ getResourceFilename("kinesis-consumer-migration-test-flink1.1-empty-snapshot"));
testHarness.open();
// assert that no state was restored
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-empty-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-empty-snapshot b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-empty-snapshot
new file mode 100644
index 0000000..f4dd96d
Binary files /dev/null and b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-empty-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-snapshot-empty
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-snapshot-empty b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-snapshot-empty
deleted file mode 100644
index f4dd96d..0000000
Binary files a/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-snapshot-empty and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 824df2d..255b8c2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -90,7 +90,7 @@ public class CEPMigration11to13Test {
// simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot = harness.snapshot(1, 1);
FileOutputStream out = new FileOutputStream(
- "src/test/resources/cep-keyed-snapshot-1.1");
+ "src/test/resources/cep-keyed-1_1-snapshot");
ObjectOutputStream oos = new ObjectOutputStream(out);
oos.writeObject(snapshot);
out.close();
@@ -109,7 +109,7 @@ public class CEPMigration11to13Test {
BasicTypeInfo.INT_TYPE_INFO);
harness.setup();
- harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-snapshot-1.1"));
+ harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
harness.open();
harness.processElement(new StreamRecord<Event>(middleEvent, 3));
@@ -209,7 +209,7 @@ public class CEPMigration11to13Test {
// simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot = harness.snapshot(1, 1);
FileOutputStream out = new FileOutputStream(
- "src/test/resources/cep-non-keyed-snapshot-1.1");
+ "src/test/resources/cep-non-keyed-1.1-snapshot");
ObjectOutputStream oos = new ObjectOutputStream(out);
oos.writeObject(snapshot);
out.close();
@@ -230,7 +230,7 @@ public class CEPMigration11to13Test {
BasicTypeInfo.BYTE_TYPE_INFO);
harness.setup();
- harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-snapshot-1.1"));
+ harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot"));
harness.open();
harness.processElement(new StreamRecord<Event>(middleEvent, 3));
@@ -361,4 +361,4 @@ public class CEPMigration11to13Test {
return value.getName().equals("end");
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-branching-1_2-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-branching-1_2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-branching-1_2-snapshot
new file mode 100644
index 0000000..47f710e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-branching-1_2-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
deleted file mode 100644
index 47f710e..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
new file mode 100644
index 0000000..277de1d
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
deleted file mode 100644
index 277de1d..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
new file mode 100644
index 0000000..b5ca51e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
deleted file mode 100644
index b5ca51e..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-1.2-snapshot
new file mode 100644
index 0000000..255f46a
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-1.2-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
deleted file mode 100644
index 255f46a..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-starting-1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-starting-1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-starting-1.2-snapshot
new file mode 100644
index 0000000..c41f6c2
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-starting-1.2-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
deleted file mode 100644
index c41f6c2..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
index 4d94d25..4221670 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
@@ -129,7 +129,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
executeAndSavepoint(
env,
- "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb",
+ "src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint",
new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
}
@@ -198,7 +198,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
restoreAndExecute(
env,
- getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"),
+ getResourceFilename("stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint"),
new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
index e60cb5d..5f03195 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
@@ -144,7 +144,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
executeAndSavepoint(
env,
- "src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb",
+ "src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint",
new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
}
@@ -228,7 +228,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
restoreAndExecute(
env,
- getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb"),
+ getResourceFilename("stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint"),
new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint
new file mode 100644
index 0000000..e63038b
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
deleted file mode 100644
index e63038b..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint
new file mode 100644
index 0000000..548993f
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb
deleted file mode 100644
index 548993f..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 71a6e29..8c5f622 100644
--- a/pom.xml
+++ b/pom.xml
@@ -952,23 +952,8 @@ under the License.
<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>
<!-- snapshots -->
- <exclude>flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot</exclude>
- <exclude>flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot</exclude>
- <exclude>flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state</exclude>
- <exclude>flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot</exclude>
- <exclude>flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot</exclude>
- <exclude>flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot</exclude>
- <exclude>flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot</exclude>
- <exclude>flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot</exclude>
- <exclude>flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot</exclude>
- <exclude>flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot</exclude>
- <exclude>flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot</exclude>
- <exclude>flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot</exclude>
- <exclude>flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot</exclude>
- <exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint</exclude>
- <exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb</exclude>
- <exclude>flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1</exclude>
- <exclude>flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1</exclude>
+ <exclude>**/src/test/resources/*-snapshot</exclude>
+ <exclude>**/src/test/resources/*-savepoint</exclude>
<!-- TweetInputFormat Test Data-->
<exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>
[2/5] flink git commit: [FLINK-6639][docs] fix code tabs in CEP docs
Posted by ch...@apache.org.
[FLINK-6639][docs] fix code tabs in CEP docs
This closes #3952.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fadc026b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fadc026b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fadc026b
Branch: refs/heads/master
Commit: fadc026bf1e90cd001bd442e5bca595eb69907cf
Parents: 8ccaffe
Author: David Anderson <da...@alpinegizmo.com>
Authored: Fri May 19 15:47:17 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200
----------------------------------------------------------------------
docs/dev/libs/cep.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fadc026b/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 58e1a0a..a5ca8b1 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -282,6 +282,7 @@ Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
{% endhighlight %}
</div>
+</div>
For non-strict contiguity one can specify if only the first succeeding matching event will be matched, or
all. In the latter case multiple matches will be emitted for the same beginning.
[5/5] flink git commit: [FLINK-5636][metrics] Measure numRecordsIn in
StreamTwoInputProcessor
Posted by ch...@apache.org.
[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor
This closes #3950.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccaffe3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccaffe3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccaffe3
Branch: refs/heads/master
Commit: 8ccaffe3d3f2472fc12fa138c45c0b67458ad2a2
Parents: 4b48530
Author: zentol <ch...@apache.org>
Authored: Fri May 19 14:39:20 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200
----------------------------------------------------------------------
.../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ccaffe3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index d34686d..367b773 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -114,6 +116,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
private long lastEmittedWatermark1;
private long lastEmittedWatermark2;
+ private Counter numRecordsIn;
+
private boolean isFinished;
@SuppressWarnings("unchecked")
@@ -195,6 +199,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {
if (isFinished) {
return false;
}
+ if (numRecordsIn == null) {
+ numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+ }
while (true) {
if (currentRecordDeserializer != null) {
@@ -230,6 +237,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
else {
StreamRecord<IN1> record = recordOrWatermark.asRecord();
synchronized (lock) {
+ numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement1(record);
}
@@ -256,6 +264,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
else {
StreamRecord<IN2> record = recordOrWatermark.asRecord();
synchronized (lock) {
+ numRecordsIn.inc();
streamOperator.setKeyContextElement2(record);
streamOperator.processElement2(record);
}
[4/5] flink git commit: [FLINK-6439] Fix close OutputStream &&
InputStream in OperatorSnapshotUtil
Posted by ch...@apache.org.
[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil
This closes #3904.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65fdadac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65fdadac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65fdadac
Branch: refs/heads/master
Commit: 65fdadac805cb1efe30ff9a57605676b1b8e45b9
Parents: 17ec6f0
Author: zjureel <zj...@gmail.com>
Authored: Mon May 15 18:14:11 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200
----------------------------------------------------------------------
.../streaming/util/OperatorSnapshotUtil.java | 162 ++++++++++---------
1 file changed, 82 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/65fdadac/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 92a9452..8011279 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -46,111 +46,113 @@ public class OperatorSnapshotUtil {
public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException {
FileOutputStream out = new FileOutputStream(path);
- DataOutputStream dos = new DataOutputStream(out);
-
- dos.writeInt(state.getOperatorChainIndex());
-
- SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
-
- Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
- if (rawOperatorState != null) {
- dos.writeInt(rawOperatorState.size());
- for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
- SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+
+ try (DataOutputStream dos = new DataOutputStream(out)) {
+
+ dos.writeInt(state.getOperatorChainIndex());
+
+ SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
+
+ Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
+ if (rawOperatorState != null) {
+ dos.writeInt(rawOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
+ SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
- if (managedOperatorState != null) {
- dos.writeInt(managedOperatorState.size());
- for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
- SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
+ if (managedOperatorState != null) {
+ dos.writeInt(managedOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
+ SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
- if (rawKeyedState != null) {
- dos.writeInt(rawKeyedState.size());
- for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
- SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
+ if (rawKeyedState != null) {
+ dos.writeInt(rawKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
+ SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no operator states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
- if (managedKeyedState != null) {
- dos.writeInt(managedKeyedState.size());
- for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
- SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
+ if (managedKeyedState != null) {
+ dos.writeInt(managedKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
+ SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no operator states, not even an empty list
- dos.writeInt(-1);
- }
- dos.flush();
- out.close();
+ dos.flush();
+ }
}
public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException {
FileInputStream in = new FileInputStream(path);
- DataInputStream dis = new DataInputStream(in);
- int index = dis.readInt();
+ try (DataInputStream dis = new DataInputStream(in)) {
+ int index = dis.readInt();
- StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
+ StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
- List<OperatorStateHandle> rawOperatorState = null;
- int numRawOperatorStates = dis.readInt();
- if (numRawOperatorStates >= 0) {
- rawOperatorState = new ArrayList<>();
- for (int i = 0; i < numRawOperatorStates; i++) {
- OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+ List<OperatorStateHandle> rawOperatorState = null;
+ int numRawOperatorStates = dis.readInt();
+ if (numRawOperatorStates >= 0) {
+ rawOperatorState = new ArrayList<>();
+ for (int i = 0; i < numRawOperatorStates; i++) {
+ OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
dis);
- rawOperatorState.add(operatorState);
+ rawOperatorState.add(operatorState);
+ }
}
- }
- List<OperatorStateHandle> managedOperatorState = null;
- int numManagedOperatorStates = dis.readInt();
- if (numManagedOperatorStates >= 0) {
- managedOperatorState = new ArrayList<>();
- for (int i = 0; i < numManagedOperatorStates; i++) {
- OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+ List<OperatorStateHandle> managedOperatorState = null;
+ int numManagedOperatorStates = dis.readInt();
+ if (numManagedOperatorStates >= 0) {
+ managedOperatorState = new ArrayList<>();
+ for (int i = 0; i < numManagedOperatorStates; i++) {
+ OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
dis);
- managedOperatorState.add(operatorState);
+ managedOperatorState.add(operatorState);
+ }
}
- }
- List<KeyedStateHandle> rawKeyedState = null;
- int numRawKeyedStates = dis.readInt();
- if (numRawKeyedStates >= 0) {
- rawKeyedState = new ArrayList<>();
- for (int i = 0; i < numRawKeyedStates; i++) {
- KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+ List<KeyedStateHandle> rawKeyedState = null;
+ int numRawKeyedStates = dis.readInt();
+ if (numRawKeyedStates >= 0) {
+ rawKeyedState = new ArrayList<>();
+ for (int i = 0; i < numRawKeyedStates; i++) {
+ KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
dis);
- rawKeyedState.add(keyedState);
+ rawKeyedState.add(keyedState);
+ }
}
- }
- List<KeyedStateHandle> managedKeyedState = null;
- int numManagedKeyedStates = dis.readInt();
- if (numManagedKeyedStates >= 0) {
- managedKeyedState = new ArrayList<>();
- for (int i = 0; i < numManagedKeyedStates; i++) {
- KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+ List<KeyedStateHandle> managedKeyedState = null;
+ int numManagedKeyedStates = dis.readInt();
+ if (numManagedKeyedStates >= 0) {
+ managedKeyedState = new ArrayList<>();
+ for (int i = 0; i < numManagedKeyedStates; i++) {
+ KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
dis);
- managedKeyedState.add(keyedState);
+ managedKeyedState.add(keyedState);
+ }
}
- }
- return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+ return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+ }
}
}
[3/5] flink git commit: [FLINK-6586] InputGateMetrics return 0 as min
for local channels
Posted by ch...@apache.org.
[FLINK-6586] InputGateMetrics return 0 as min for local channels
This closes #3907.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17ec6f02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17ec6f02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17ec6f02
Branch: refs/heads/master
Commit: 17ec6f020b779efe9152456f4ef33f6f802e4f67
Parents: fadc026
Author: zentol <ch...@apache.org>
Authored: Mon May 15 13:56:06 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200
----------------------------------------------------------------------
.../io/network/partition/consumer/InputGateMetrics.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/17ec6f02/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index 796a6db..69af455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -72,10 +72,6 @@ public class InputGateMetrics {
int min = Integer.MAX_VALUE;
Collection<InputChannel> channels = inputGate.getInputChannels().values();
- if (channels.isEmpty()) {
- // meaningful value when no channels exist:
- return 0;
- }
for (InputChannel channel : channels) {
if (channel instanceof RemoteInputChannel) {
@@ -86,6 +82,9 @@ public class InputGateMetrics {
}
}
+ if (min == Integer.MAX_VALUE) { // in case all channels are local, or the channel collection was empty
+ return 0;
+ }
return min;
}