You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/02 10:19:44 UTC
flink git commit: [FLINK-9263][state] Fix concurrency problem in
DefaultOperatorStateBackend.
Repository: flink
Updated Branches:
refs/heads/master 390d36132 -> c11f11359
[FLINK-9263][state] Fix concurrency problem in DefaultOperatorStateBackend.
This closes #5930.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c11f1135
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c11f1135
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c11f1135
Branch: refs/heads/master
Commit: c11f11359b8915533ad886015d57298e3daeb821
Parents: 390d361
Author: sihuazhou <su...@163.com>
Authored: Fri Apr 27 20:42:33 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed May 2 12:18:36 2018 +0200
----------------------------------------------------------------------
.../runtime/state/DefaultOperatorStateBackend.java | 2 +-
.../flink/runtime/state/HeapBroadcastState.java | 2 +-
.../RegisteredBroadcastBackendStateMetaInfo.java | 17 +++++++++++++++++
.../RegisteredOperatorBackendStateMetaInfo.java | 16 ++++++++++++++++
4 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index edbd605..a2e49cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -636,7 +636,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
private PartitionableListState(PartitionableListState<S> toCopy) {
- this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList));
+ this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
}
public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
index 42e68f3..7ebf1ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -66,7 +66,7 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
}
private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) {
- this(toCopy.stateMetaInfo, toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
+ this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
index d462b34..7204cd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
@@ -52,6 +52,23 @@ public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
this.valueSerializer = Preconditions.checkNotNull(valueSerializer);
}
+ public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> copy) {
+
+ Preconditions.checkNotNull(copy);
+
+ this.name = copy.name;
+ this.assignmentMode = copy.assignmentMode;
+ this.keySerializer = copy.keySerializer.duplicate();
+ this.valueSerializer = copy.valueSerializer.duplicate();
+ }
+
+ /**
+ * Creates a deep copy of the itself.
+ */
+ public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() {
+ return new RegisteredBroadcastBackendStateMetaInfo<>(this);
+ }
+
public String getName() {
return name;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
index af289f9..a9adc8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
@@ -57,6 +57,22 @@ public class RegisteredOperatorBackendStateMetaInfo<S> {
this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
}
+ private RegisteredOperatorBackendStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> copy) {
+
+ Preconditions.checkNotNull(copy);
+
+ this.name = copy.name;
+ this.partitionStateSerializer = copy.partitionStateSerializer.duplicate();
+ this.assignmentMode = copy.assignmentMode;
+ }
+
+ /**
+ * Creates a deep copy of the itself.
+ */
+ public RegisteredOperatorBackendStateMetaInfo<S> deepCopy() {
+ return new RegisteredOperatorBackendStateMetaInfo<>(this);
+ }
+
public String getName() {
return name;
}