You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/19 09:12:42 UTC

[1/4] flink git commit: [hotfix][statebackend] Removed use of rawtype access for generic collection

Repository: flink
Updated Branches:
  refs/heads/master 388a083c9 -> d1725a9a2


[hotfix][statebackend] Removed use of rawtype access for generic collection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1725a9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1725a9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1725a9a

Branch: refs/heads/master
Commit: d1725a9a2d2d4c2780312ebe82f9d3a26f018cf8
Parents: 1bfe87f
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 18:25:15 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200

----------------------------------------------------------------------
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1725a9a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 3000667..ad40c70 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1195,10 +1195,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			throw new IOException("Error creating ColumnFamilyHandle.", e);
 		}
 
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple =
 			new Tuple2<>(columnFamily, newMetaInfo);
-		Map rawAccess = kvStateInformation;
-		rawAccess.put(descriptor.getName(), tuple);
+		kvStateInformation.put(descriptor.getName(), tuple);
 		return columnFamily;
 	}
 


[4/4] flink git commit: [FLINK-9022][state] Backend disposal in StreamTaskStateInitializer should always be performed in cleanup.

Posted by sr...@apache.org.
[FLINK-9022][state] Backend disposal in StreamTaskStateInitializer should always be performed in cleanup.

This step should be independent from the fact if the backend is still registered with the closeable registry.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/777cc1ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/777cc1ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/777cc1ab

Branch: refs/heads/master
Commit: 777cc1ab53884f12ac245e62d367c4439f7939e0
Parents: 388a083
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 18:27:00 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200

----------------------------------------------------------------------
 .../api/operators/StreamTaskStateInitializerImpl.java     | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/777cc1ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index d9bd089..460a52b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -165,12 +165,18 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
 		} catch (Exception ex) {
 
 			// cleanup if something went wrong before results got published.
-			if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
+			if (keyedStatedBackend != null) {
+				if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
+					IOUtils.closeQuietly(keyedStatedBackend);
+				}
 				// release resource (e.g native resource)
 				keyedStatedBackend.dispose();
 			}
 
-			if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
+			if (operatorStateBackend != null) {
+				if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
+					IOUtils.closeQuietly(operatorStateBackend);
+				}
 				operatorStateBackend.dispose();
 			}
 


[3/4] flink git commit: [hotfix] Generated OperatorSubtaskDescription string should start counting from 1 for index 0

Posted by sr...@apache.org.
[hotfix] Generated OperatorSubtaskDescription string should start counting from 1 for index 0


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1bfe87f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1bfe87f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1bfe87f1

Branch: refs/heads/master
Commit: 1bfe87f12274a96e517f6229ddbc01622cfb50a0
Parents: a455d6a
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 18:24:28 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/util/OperatorSubtaskDescriptionText.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1bfe87f1/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java
index 65b3714..ec2e571 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java
@@ -37,7 +37,7 @@ public class OperatorSubtaskDescriptionText {
 
 		this.description = operatorClass +
 				"_" + operatorId +
-				"_(" + subtaskIndex + "/" + numberOfTasks + ")";
+				"_(" + (1 + subtaskIndex) + "/" + numberOfTasks + ")";
 	}
 
 	@Override


[2/4] flink git commit: [hotfix][checkpointing] Double check if local state directory exists to avoid problem with concurrent directory creation.

Posted by sr...@apache.org.
[hotfix][checkpointing] Double check if local state directory exists to avoid problem with concurrent directory creation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a455d6a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a455d6a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a455d6a7

Branch: refs/heads/master
Commit: a455d6a77e5678140bc4e19c75c29905b25cbe0b
Parents: 777cc1a
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 10:50:24 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200

----------------------------------------------------------------------
 .../state/TaskExecutorLocalStateStoresManager.java       | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a455d6a7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index e7a7d8f..518ad81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -83,12 +83,13 @@ public class TaskExecutorLocalStateStoresManager {
 		this.closed = false;
 
 		for (File localStateRecoveryRootDir : localStateRootDirectories) {
-			if (!localStateRecoveryRootDir.exists()) {
 
-				if (!localStateRecoveryRootDir.mkdirs()) {
-					throw new IOException("Could not create root directory for local recovery: " +
-						localStateRecoveryRootDir);
-				}
+			if (!localStateRecoveryRootDir.exists()
+				&& !localStateRecoveryRootDir.mkdirs()
+				// we double check for exists in case another task created the directory concurrently.
+				&& !localStateRecoveryRootDir.exists()) {
+				throw new IOException("Could not create root directory for local recovery: " +
+					localStateRecoveryRootDir);
 			}
 		}