You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/19 21:10:02 UTC

[1/5] flink git commit: [hotfix][rat] Add exclusion for all test snapshots/savepoints

Repository: flink
Updated Branches:
  refs/heads/master 8e3213678 -> 65fdadac8


[hotfix][rat] Add exclusion for all test snapshots/savepoints

This closes #3854.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b485307
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b485307
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b485307

Branch: refs/heads/master
Commit: 4b485307800d04af460fec29f6f2b34b1ff189b1
Parents: 8e32136
Author: zentol <ch...@apache.org>
Authored: Wed May 10 16:50:18 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:33 2017 +0200

----------------------------------------------------------------------
 ...inkKafkaConsumerBaseFrom11MigrationTest.java |   6 +++---
 ...inkKafkaConsumerBaseFrom12MigrationTest.java |   8 ++++----
 ...migration-test-flink1.1-empty-state-snapshot | Bin 0 -> 468 bytes
 ...migration-test-flink1.1-snapshot-empty-state | Bin 468 -> 0 bytes
 ...migration-test-flink1.2-empty-state-snapshot | Bin 0 -> 240 bytes
 ...migration-test-flink1.2-snapshot-empty-state | Bin 240 -> 0 bytes
 .../FlinkKinesisConsumerMigrationTest.java      |   2 +-
 ...sumer-migration-test-flink1.1-empty-snapshot | Bin 0 -> 468 bytes
 ...sumer-migration-test-flink1.1-snapshot-empty | Bin 468 -> 0 bytes
 .../cep/operator/CEPMigration11to13Test.java    |  10 +++++-----
 .../test/resources/cep-branching-1_2-snapshot   | Bin 0 -> 6736 bytes
 .../test/resources/cep-branching-snapshot-1.2   | Bin 6736 -> 0 bytes
 .../src/test/resources/cep-keyed-1_1-snapshot   | Bin 0 -> 5612 bytes
 .../src/test/resources/cep-keyed-snapshot-1.1   | Bin 5612 -> 0 bytes
 .../test/resources/cep-non-keyed-1.1-snapshot   | Bin 0 -> 3274 bytes
 .../test/resources/cep-non-keyed-snapshot-1.1   | Bin 3274 -> 0 bytes
 .../resources/cep-single-pattern-1.2-snapshot   | Bin 0 -> 3311 bytes
 .../resources/cep-single-pattern-snapshot-1.2   | Bin 3311 -> 0 bytes
 .../test/resources/cep-starting-1.2-snapshot    | Bin 0 -> 6526 bytes
 .../test/resources/cep-starting-snapshot-1.2    | Bin 6526 -> 0 bytes
 ...atefulJobSavepointFrom11MigrationITCase.java |   4 ++--
 ...atefulJobSavepointFrom12MigrationITCase.java |   4 ++--
 ...-migration-itcase-flink1.1-rocksdb-savepoint | Bin 0 -> 22283 bytes
 ...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 22283 -> 0 bytes
 ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 25256 bytes
 ...-migration-itcase-flink1.2-savepoint-rocksdb | Bin 25256 -> 0 bytes
 pom.xml                                         |  19 ++-----------------
 27 files changed, 19 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
index 7cc1f9c..c07ebd5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
@@ -67,7 +67,7 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
 		testHarness.setup();
 		// restore state from binary snapshot file using legacy method
 		testHarness.initializeStateFromLegacyCheckpoint(
-			getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+			getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
 		testHarness.open();
 
 		// assert that no partitions were found and is empty
@@ -101,10 +101,10 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
 		testHarness.setup();
 		// restore state from binary snapshot file using legacy method
 		testHarness.initializeStateFromLegacyCheckpoint(
-			getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+			getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
 		testHarness.open();
 
-		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot-empty-state";
+		// the expected state in "kafka-consumer-migration-test-flink1.1-empty-state-snapshot";
 		// since the state is empty, the consumer should reflect on the startup mode to determine start offsets.
 		final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
 		expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("abc", 13), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
index 6414a12..f11bf9f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
@@ -76,7 +76,7 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
 		writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot", PARTITION_STATE);
 
 		final HashMap<KafkaTopicPartition, Long> emptyState = new HashMap<>();
-		writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state", emptyState);
+		writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot", emptyState);
 	}
 
 	private void writeSnapshot(String path, HashMap<KafkaTopicPartition, Long> state) throws Exception {
@@ -166,7 +166,7 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
 		// restore state from binary snapshot file
 		testHarness.initializeState(
 				OperatorSnapshotUtil.readStateHandle(
-						OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-snapshot-empty-state")));
+						OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-empty-state-snapshot")));
 		testHarness.open();
 
 		// assert that no partitions were found and is empty
@@ -202,10 +202,10 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
 		// restore state from binary snapshot file
 		testHarness.initializeState(
 				OperatorSnapshotUtil.readStateHandle(
-						OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-snapshot-empty-state")));
+						OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-empty-state-snapshot")));
 		testHarness.open();
 
-		// the expected state in "kafka-consumer-migration-test-flink1.2-snapshot-empty-state";
+		// the expected state in "kafka-consumer-migration-test-flink1.2-empty-state-snapshot";
 		// since the state is empty, the consumer should reflect on the startup mode to determine start offsets.
 		final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
 		for (KafkaTopicPartition partition : PARTITION_STATE.keySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-empty-state-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-empty-state-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-empty-state-snapshot
new file mode 100644
index 0000000..f4dd96d
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-empty-state-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state
deleted file mode 100644
index f4dd96d..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot
new file mode 100644
index 0000000..45047ee
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state
deleted file mode 100644
index 45047ee..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index 7629f9d..ec9a9b5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -66,7 +66,7 @@ public class FlinkKinesisConsumerMigrationTest {
 		testHarness.setup();
 		// restore state from binary snapshot file using legacy method
 		testHarness.initializeStateFromLegacyCheckpoint(
-			getResourceFilename("kinesis-consumer-migration-test-flink1.1-snapshot-empty"));
+			getResourceFilename("kinesis-consumer-migration-test-flink1.1-empty-snapshot"));
 		testHarness.open();
 
 		// assert that no state was restored

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-empty-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-empty-snapshot b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-empty-snapshot
new file mode 100644
index 0000000..f4dd96d
Binary files /dev/null and b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-empty-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-snapshot-empty
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-snapshot-empty b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-snapshot-empty
deleted file mode 100644
index f4dd96d..0000000
Binary files a/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.1-snapshot-empty and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 824df2d..255b8c2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -90,7 +90,7 @@ public class CEPMigration11to13Test {
 		// simulate snapshot/restore with empty element queue but NFA state
 		StreamTaskState snapshot = harness.snapshot(1, 1);
 		FileOutputStream out = new FileOutputStream(
-				"src/test/resources/cep-keyed-snapshot-1.1");
+				"src/test/resources/cep-keyed-1_1-snapshot");
 		ObjectOutputStream oos = new ObjectOutputStream(out);
 		oos.writeObject(snapshot);
 		out.close();
@@ -109,7 +109,7 @@ public class CEPMigration11to13Test {
 						BasicTypeInfo.INT_TYPE_INFO);
 
 		harness.setup();
-		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-snapshot-1.1"));
+		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
 		harness.open();
 
 		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
@@ -209,7 +209,7 @@ public class CEPMigration11to13Test {
 		// simulate snapshot/restore with empty element queue but NFA state
 		StreamTaskState snapshot = harness.snapshot(1, 1);
 		FileOutputStream out = new FileOutputStream(
-				"src/test/resources/cep-non-keyed-snapshot-1.1");
+				"src/test/resources/cep-non-keyed-1.1-snapshot");
 		ObjectOutputStream oos = new ObjectOutputStream(out);
 		oos.writeObject(snapshot);
 		out.close();
@@ -230,7 +230,7 @@ public class CEPMigration11to13Test {
 						BasicTypeInfo.BYTE_TYPE_INFO);
 
 		harness.setup();
-		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-snapshot-1.1"));
+		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot"));
 		harness.open();
 
 		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
@@ -361,4 +361,4 @@ public class CEPMigration11to13Test {
 			return value.getName().equals("end");
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-branching-1_2-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-branching-1_2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-branching-1_2-snapshot
new file mode 100644
index 0000000..47f710e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-branching-1_2-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
deleted file mode 100644
index 47f710e..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
new file mode 100644
index 0000000..277de1d
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
deleted file mode 100644
index 277de1d..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
new file mode 100644
index 0000000..b5ca51e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
deleted file mode 100644
index b5ca51e..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-1.2-snapshot
new file mode 100644
index 0000000..255f46a
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-1.2-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
deleted file mode 100644
index 255f46a..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-starting-1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-starting-1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-starting-1.2-snapshot
new file mode 100644
index 0000000..c41f6c2
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-starting-1.2-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
deleted file mode 100644
index c41f6c2..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
index 4d94d25..4221670 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
@@ -129,7 +129,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 
 		executeAndSavepoint(
 				env,
-				"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb",
+				"src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint",
 				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
 	}
 
@@ -198,7 +198,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
 
 		restoreAndExecute(
 				env,
-				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"),
+				getResourceFilename("stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint"),
 				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
index e60cb5d..5f03195 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
@@ -144,7 +144,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		executeAndSavepoint(
 				env,
-				"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb",
+				"src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint",
 				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
 	}
 
@@ -228,7 +228,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		restoreAndExecute(
 				env,
-				getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb"),
+				getResourceFilename("stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint"),
 				new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
 				new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
 				new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint
new file mode 100644
index 0000000..e63038b
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
deleted file mode 100644
index e63038b..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint
new file mode 100644
index 0000000..548993f
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb
deleted file mode 100644
index 548993f..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 71a6e29..8c5f622 100644
--- a/pom.xml
+++ b/pom.xml
@@ -952,23 +952,8 @@ under the License.
 						<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>
 
 						<!-- snapshots -->
-						<exclude>flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot</exclude>
-						<exclude>flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot</exclude>
-						<exclude>flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state</exclude>
-						<exclude>flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot</exclude>
-						<exclude>flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot</exclude>
-						<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot</exclude>
-						<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot</exclude>
-						<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot</exclude>
-						<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot</exclude>
-						<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot</exclude>
-						<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot</exclude>
-						<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot</exclude>
-						<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot</exclude>
-						<exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint</exclude>
-						<exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb</exclude>
-						<exclude>flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1</exclude>
-						<exclude>flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1</exclude>
+						<exclude>**/src/test/resources/*-snapshot</exclude>
+						<exclude>**/src/test/resources/*-savepoint</exclude>
 
 						<!-- TweetInputFormat Test Data-->
 						<exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>


[2/5] flink git commit: [FLINK-6639][docs] fix code tabs in CEP docs

Posted by ch...@apache.org.
[FLINK-6639][docs] fix code tabs in CEP docs

This closes #3952.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fadc026b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fadc026b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fadc026b

Branch: refs/heads/master
Commit: fadc026bf1e90cd001bd442e5bca595eb69907cf
Parents: 8ccaffe
Author: David Anderson <da...@alpinegizmo.com>
Authored: Fri May 19 15:47:17 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fadc026b/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 58e1a0a..a5ca8b1 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -282,6 +282,7 @@ Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
 val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
 {% endhighlight %}
 </div>
+</div>
 
 For non-strict contiguity one can specify if only the first succeeding matching event will be matched, or
 all. In the latter case multiple matches will be emitted for the same beginning.


[5/5] flink git commit: [FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor

Posted by ch...@apache.org.
[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor

This closes #3950.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccaffe3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccaffe3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccaffe3

Branch: refs/heads/master
Commit: 8ccaffe3d3f2472fc12fa138c45c0b67458ad2a2
Parents: 4b48530
Author: zentol <ch...@apache.org>
Authored: Fri May 19 14:39:20 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ccaffe3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index d34686d..367b773 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -114,6 +116,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 	private long lastEmittedWatermark1;
 	private long lastEmittedWatermark2;
 
+	private Counter numRecordsIn;
+
 	private boolean isFinished;
 
 	@SuppressWarnings("unchecked")
@@ -195,6 +199,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		if (isFinished) {
 			return false;
 		}
+		if (numRecordsIn == null) {
+			numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+		}
 
 		while (true) {
 			if (currentRecordDeserializer != null) {
@@ -230,6 +237,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 						else {
 							StreamRecord<IN1> record = recordOrWatermark.asRecord();
 							synchronized (lock) {
+								numRecordsIn.inc();
 								streamOperator.setKeyContextElement1(record);
 								streamOperator.processElement1(record);
 							}
@@ -256,6 +264,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 						else {
 							StreamRecord<IN2> record = recordOrWatermark.asRecord();
 							synchronized (lock) {
+								numRecordsIn.inc();
 								streamOperator.setKeyContextElement2(record);
 								streamOperator.processElement2(record);
 							}


[4/5] flink git commit: [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil

Posted by ch...@apache.org.
[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil

This closes #3904.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65fdadac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65fdadac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65fdadac

Branch: refs/heads/master
Commit: 65fdadac805cb1efe30ff9a57605676b1b8e45b9
Parents: 17ec6f0
Author: zjureel <zj...@gmail.com>
Authored: Mon May 15 18:14:11 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200

----------------------------------------------------------------------
 .../streaming/util/OperatorSnapshotUtil.java    | 162 ++++++++++---------
 1 file changed, 82 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65fdadac/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 92a9452..8011279 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -46,111 +46,113 @@ public class OperatorSnapshotUtil {
 
 	public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException {
 		FileOutputStream out = new FileOutputStream(path);
-		DataOutputStream dos = new DataOutputStream(out);
-
-		dos.writeInt(state.getOperatorChainIndex());
-
-		SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
-
-		Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
-		if (rawOperatorState != null) {
-			dos.writeInt(rawOperatorState.size());
-			for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
-				SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+		
+		try (DataOutputStream dos = new DataOutputStream(out)) {
+
+			dos.writeInt(state.getOperatorChainIndex());
+
+			SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
+
+			Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
+			if (rawOperatorState != null) {
+				dos.writeInt(rawOperatorState.size());
+				for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
+					SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+				}
+			} else {
+				// this means no states, not even an empty list
+				dos.writeInt(-1);
 			}
-		} else {
-			// this means no states, not even an empty list
-			dos.writeInt(-1);
-		}
 
-		Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
-		if (managedOperatorState != null) {
-			dos.writeInt(managedOperatorState.size());
-			for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
-				SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+			Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
+			if (managedOperatorState != null) {
+				dos.writeInt(managedOperatorState.size());
+				for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
+					SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+				}
+			} else {
+				// this means no states, not even an empty list
+				dos.writeInt(-1);
 			}
-		} else {
-			// this means no states, not even an empty list
-			dos.writeInt(-1);
-		}
 
-		Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
-		if (rawKeyedState != null) {
-			dos.writeInt(rawKeyedState.size());
-			for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
-				SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+			Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
+			if (rawKeyedState != null) {
+				dos.writeInt(rawKeyedState.size());
+				for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
+					SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+				}
+			} else {
+				// this means no operator states, not even an empty list
+				dos.writeInt(-1);
 			}
-		} else {
-			// this means no operator states, not even an empty list
-			dos.writeInt(-1);
-		}
 
-		Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
-		if (managedKeyedState != null) {
-			dos.writeInt(managedKeyedState.size());
-			for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
-				SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+			Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
+			if (managedKeyedState != null) {
+				dos.writeInt(managedKeyedState.size());
+				for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
+					SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+				}
+			} else {
+				// this means no operator states, not even an empty list
+				dos.writeInt(-1);
 			}
-		} else {
-			// this means no operator states, not even an empty list
-			dos.writeInt(-1);
-		}
 
-		dos.flush();
-		out.close();
+			dos.flush();
+		}
 	}
 
 	public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException {
 		FileInputStream in = new FileInputStream(path);
-		DataInputStream dis = new DataInputStream(in);
-		int index = dis.readInt();
+		try (DataInputStream dis = new DataInputStream(in)) {
+			int index = dis.readInt();
 
-		StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
+			StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
 
-		List<OperatorStateHandle> rawOperatorState = null;
-		int numRawOperatorStates = dis.readInt();
-		if (numRawOperatorStates >= 0) {
-			rawOperatorState = new ArrayList<>();
-			for (int i = 0; i < numRawOperatorStates; i++) {
-				OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+			List<OperatorStateHandle> rawOperatorState = null;
+			int numRawOperatorStates = dis.readInt();
+			if (numRawOperatorStates >= 0) {
+				rawOperatorState = new ArrayList<>();
+				for (int i = 0; i < numRawOperatorStates; i++) {
+					OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
 						dis);
-				rawOperatorState.add(operatorState);
+					rawOperatorState.add(operatorState);
+				}
 			}
-		}
 
-		List<OperatorStateHandle> managedOperatorState = null;
-		int numManagedOperatorStates = dis.readInt();
-		if (numManagedOperatorStates >= 0) {
-			managedOperatorState = new ArrayList<>();
-			for (int i = 0; i < numManagedOperatorStates; i++) {
-				OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+			List<OperatorStateHandle> managedOperatorState = null;
+			int numManagedOperatorStates = dis.readInt();
+			if (numManagedOperatorStates >= 0) {
+				managedOperatorState = new ArrayList<>();
+				for (int i = 0; i < numManagedOperatorStates; i++) {
+					OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
 						dis);
-				managedOperatorState.add(operatorState);
+					managedOperatorState.add(operatorState);
+				}
 			}
-		}
 
-		List<KeyedStateHandle> rawKeyedState = null;
-		int numRawKeyedStates = dis.readInt();
-		if (numRawKeyedStates >= 0) {
-			rawKeyedState = new ArrayList<>();
-			for (int i = 0; i < numRawKeyedStates; i++) {
-				KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+			List<KeyedStateHandle> rawKeyedState = null;
+			int numRawKeyedStates = dis.readInt();
+			if (numRawKeyedStates >= 0) {
+				rawKeyedState = new ArrayList<>();
+				for (int i = 0; i < numRawKeyedStates; i++) {
+					KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
 						dis);
-				rawKeyedState.add(keyedState);
+					rawKeyedState.add(keyedState);
+				}
 			}
-		}
 
-		List<KeyedStateHandle> managedKeyedState = null;
-		int numManagedKeyedStates = dis.readInt();
-		if (numManagedKeyedStates >= 0) {
-			managedKeyedState = new ArrayList<>();
-			for (int i = 0; i < numManagedKeyedStates; i++) {
-				KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+			List<KeyedStateHandle> managedKeyedState = null;
+			int numManagedKeyedStates = dis.readInt();
+			if (numManagedKeyedStates >= 0) {
+				managedKeyedState = new ArrayList<>();
+				for (int i = 0; i < numManagedKeyedStates; i++) {
+					KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
 						dis);
-				managedKeyedState.add(keyedState);
+					managedKeyedState.add(keyedState);
+				}
 			}
-		}
 
-		return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+			return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+		}
 	}
 }


[3/5] flink git commit: [FLINK-6586] InputGateMetrics return 0 as min for local channels

Posted by ch...@apache.org.
[FLINK-6586] InputGateMetrics return 0 as min for local channels

This closes #3907.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17ec6f02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17ec6f02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17ec6f02

Branch: refs/heads/master
Commit: 17ec6f020b779efe9152456f4ef33f6f802e4f67
Parents: fadc026
Author: zentol <ch...@apache.org>
Authored: Mon May 15 13:56:06 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200

----------------------------------------------------------------------
 .../io/network/partition/consumer/InputGateMetrics.java       | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17ec6f02/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index 796a6db..69af455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -72,10 +72,6 @@ public class InputGateMetrics {
 		int min = Integer.MAX_VALUE;
 
 		Collection<InputChannel> channels = inputGate.getInputChannels().values();
-		if (channels.isEmpty()) {
-			// meaningful value when no channels exist:
-			return 0;
-		}
 
 		for (InputChannel channel : channels) {
 			if (channel instanceof RemoteInputChannel) {
@@ -86,6 +82,9 @@ public class InputGateMetrics {
 			}
 		}
 
+		if (min == Integer.MAX_VALUE) { // in case all channels are local, or the channel collection was empty
+			return 0;
+		}
 		return min;
 	}