You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/02 10:19:44 UTC

flink git commit: [FLINK-9263][state] Fix concurrency problem in DefaultOperatorStateBackend.

Repository: flink
Updated Branches:
  refs/heads/master 390d36132 -> c11f11359


[FLINK-9263][state] Fix concurrency problem in DefaultOperatorStateBackend.

This closes #5930.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c11f1135
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c11f1135
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c11f1135

Branch: refs/heads/master
Commit: c11f11359b8915533ad886015d57298e3daeb821
Parents: 390d361
Author: sihuazhou <su...@163.com>
Authored: Fri Apr 27 20:42:33 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed May 2 12:18:36 2018 +0200

----------------------------------------------------------------------
 .../runtime/state/DefaultOperatorStateBackend.java |  2 +-
 .../flink/runtime/state/HeapBroadcastState.java    |  2 +-
 .../RegisteredBroadcastBackendStateMetaInfo.java   | 17 +++++++++++++++++
 .../RegisteredOperatorBackendStateMetaInfo.java    | 16 ++++++++++++++++
 4 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index edbd605..a2e49cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -636,7 +636,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 		private PartitionableListState(PartitionableListState<S> toCopy) {
 
-			this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList));
+			this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
 		}
 
 		public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
index 42e68f3..7ebf1ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -66,7 +66,7 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
 	}
 
 	private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) {
-		this(toCopy.stateMetaInfo, toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
+		this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
index d462b34..7204cd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
@@ -52,6 +52,23 @@ public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
 		this.valueSerializer = Preconditions.checkNotNull(valueSerializer);
 	}
 
+	public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> copy) {
+
+		Preconditions.checkNotNull(copy);
+
+		this.name = copy.name;
+		this.assignmentMode = copy.assignmentMode;
+		this.keySerializer = copy.keySerializer.duplicate();
+		this.valueSerializer = copy.valueSerializer.duplicate();
+	}
+
+	/**
+	 * Creates a deep copy of the itself.
+	 */
+	public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() {
+		return new RegisteredBroadcastBackendStateMetaInfo<>(this);
+	}
+
 	public String getName() {
 		return name;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c11f1135/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
index af289f9..a9adc8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
@@ -57,6 +57,22 @@ public class RegisteredOperatorBackendStateMetaInfo<S> {
 		this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
 	}
 
+	private RegisteredOperatorBackendStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> copy) {
+
+		Preconditions.checkNotNull(copy);
+
+		this.name = copy.name;
+		this.partitionStateSerializer = copy.partitionStateSerializer.duplicate();
+		this.assignmentMode = copy.assignmentMode;
+	}
+
+	/**
+	 * Creates a deep copy of the itself.
+	 */
+	public RegisteredOperatorBackendStateMetaInfo<S> deepCopy() {
+		return new RegisteredOperatorBackendStateMetaInfo<>(this);
+	}
+
 	public String getName() {
 		return name;
 	}