You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/19 09:12:42 UTC
[1/4] flink git commit: [hotfix][statebackend] Removed use of rawtype
access for generic collection
Repository: flink
Updated Branches:
refs/heads/master 388a083c9 -> d1725a9a2
[hotfix][statebackend] Removed use of rawtype access for generic collection
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1725a9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1725a9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1725a9a
Branch: refs/heads/master
Commit: d1725a9a2d2d4c2780312ebe82f9d3a26f018cf8
Parents: 1bfe87f
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 18:25:15 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200
----------------------------------------------------------------------
.../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d1725a9a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 3000667..ad40c70 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1195,10 +1195,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
throw new IOException("Error creating ColumnFamilyHandle.", e);
}
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple =
new Tuple2<>(columnFamily, newMetaInfo);
- Map rawAccess = kvStateInformation;
- rawAccess.put(descriptor.getName(), tuple);
+ kvStateInformation.put(descriptor.getName(), tuple);
return columnFamily;
}
[4/4] flink git commit: [FLINK-9022][state] Backend disposal in
StreamTaskStateInitializer should always be performed in cleanup.
Posted by sr...@apache.org.
[FLINK-9022][state] Backend disposal in StreamTaskStateInitializer should always be performed in cleanup.
This step should be independent from the fact if the backend is still registered with the closeable registry.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/777cc1ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/777cc1ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/777cc1ab
Branch: refs/heads/master
Commit: 777cc1ab53884f12ac245e62d367c4439f7939e0
Parents: 388a083
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 18:27:00 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200
----------------------------------------------------------------------
.../api/operators/StreamTaskStateInitializerImpl.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/777cc1ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index d9bd089..460a52b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -165,12 +165,18 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
} catch (Exception ex) {
// cleanup if something went wrong before results got published.
- if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
+ if (keyedStatedBackend != null) {
+ if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
+ IOUtils.closeQuietly(keyedStatedBackend);
+ }
// release resource (e.g native resource)
keyedStatedBackend.dispose();
}
- if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
+ if (operatorStateBackend != null) {
+ if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
+ IOUtils.closeQuietly(operatorStateBackend);
+ }
operatorStateBackend.dispose();
}
[3/4] flink git commit: [hotfix] Generated OperatorSubtaskDescription
string should start counting from 1 for index 0
Posted by sr...@apache.org.
[hotfix] Generated OperatorSubtaskDescription string should start counting from 1 for index 0
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1bfe87f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1bfe87f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1bfe87f1
Branch: refs/heads/master
Commit: 1bfe87f12274a96e517f6229ddbc01622cfb50a0
Parents: a455d6a
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 18:24:28 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/util/OperatorSubtaskDescriptionText.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1bfe87f1/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java
index 65b3714..ec2e571 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/OperatorSubtaskDescriptionText.java
@@ -37,7 +37,7 @@ public class OperatorSubtaskDescriptionText {
this.description = operatorClass +
"_" + operatorId +
- "_(" + subtaskIndex + "/" + numberOfTasks + ")";
+ "_(" + (1 + subtaskIndex) + "/" + numberOfTasks + ")";
}
@Override
[2/4] flink git commit: [hotfix][checkpointing] Double check if local
state directory exists to avoid problem with concurrent directory creation.
Posted by sr...@apache.org.
[hotfix][checkpointing] Double check if local state directory exists to avoid problem with concurrent directory creation.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a455d6a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a455d6a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a455d6a7
Branch: refs/heads/master
Commit: a455d6a77e5678140bc4e19c75c29905b25cbe0b
Parents: 777cc1a
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 10:50:24 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200
----------------------------------------------------------------------
.../state/TaskExecutorLocalStateStoresManager.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a455d6a7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index e7a7d8f..518ad81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -83,12 +83,13 @@ public class TaskExecutorLocalStateStoresManager {
this.closed = false;
for (File localStateRecoveryRootDir : localStateRootDirectories) {
- if (!localStateRecoveryRootDir.exists()) {
- if (!localStateRecoveryRootDir.mkdirs()) {
- throw new IOException("Could not create root directory for local recovery: " +
- localStateRecoveryRootDir);
- }
+ if (!localStateRecoveryRootDir.exists()
+ && !localStateRecoveryRootDir.mkdirs()
+ // we double check for exists in case another task created the directory concurrently.
+ && !localStateRecoveryRootDir.exists()) {
+ throw new IOException("Could not create root directory for local recovery: " +
+ localStateRecoveryRootDir);
}
}