You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/09/25 16:14:06 UTC

[2/2] flink git commit: [FLINK-7541][runtime] Refactor StateAssignmentOperation and use OperatorID

[FLINK-7541][runtime] Refactor StateAssignmentOperation and use OperatorID

This is not complete refactor, some methods still relay on the order of the
new and old operators.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1b2b83d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1b2b83d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1b2b83d

Branch: refs/heads/master
Commit: f1b2b83d639658907bbc521f967e2673c4c866a1
Parents: 79b916c
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Aug 25 15:23:15 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Sep 25 17:55:55 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/StateAssignmentOperation.java    | 270 +++++++++++--------
 .../runtime/jobgraph/OperatorInstanceID.java    |  73 +++++
 2 files changed, 234 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1b2b83d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index cc9f9cd..76db912 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -30,6 +31,9 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.ArrayListMultimap;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +46,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * This class encapsulates the operation of assigning restored state when restoring from a checkpoint.
  */
@@ -116,9 +123,7 @@ public class StateAssignmentOperation {
 			executionJobVertex.getMaxParallelism(),
 			newParallelism);
 
-		//2. Redistribute the operator state.
 		/**
-		 *
 		 * Redistribute ManagedOperatorStates and RawOperatorStates from old parallelism to new parallelism.
 		 *
 		 * The old ManagedOperatorStates with old parallelism 3:
@@ -137,13 +142,27 @@ public class StateAssignmentOperation {
 		 * op2   state2,0	  state2,1 	   state2,2		state2,3
 		 * op3   state3,0	  state3,1 	   state3,2		state3,3
 		 */
-		List<List<Collection<OperatorStateHandle>>> newManagedOperatorStates = new ArrayList<>();
-		List<List<Collection<OperatorStateHandle>>> newRawOperatorStates = new ArrayList<>();
-
-		reDistributePartitionableStates(operatorStates, newParallelism, newManagedOperatorStates, newRawOperatorStates);
-
+		Multimap<OperatorInstanceID, OperatorStateHandle> newManagedOperatorStates = ArrayListMultimap.create();
+		Multimap<OperatorInstanceID, OperatorStateHandle> newRawOperatorStates = ArrayListMultimap.create();
+
+		reDistributePartitionableStates(
+			operatorStates,
+			newParallelism,
+			operatorIDs,
+			newManagedOperatorStates,
+			newRawOperatorStates);
+
+		Multimap<OperatorInstanceID, KeyedStateHandle> newManagedKeyedState = ArrayListMultimap.create();
+		Multimap<OperatorInstanceID, KeyedStateHandle> newRawKeyedState = ArrayListMultimap.create();
+
+		reDistributeKeyedStates(
+			operatorStates,
+			newParallelism,
+			operatorIDs,
+			keyGroupPartitions,
+			newManagedKeyedState,
+			newRawKeyedState);
 
-		//3. Compute TaskStateHandles of every subTask in the executionJobVertex
 		/**
 		 *  An executionJobVertex's all state handles needed to restore are something like a matrix
 		 *
@@ -153,113 +172,122 @@ public class StateAssignmentOperation {
 		 * op2   sh(2,0)	 sh(2,1)	   sh(2,2)		sh(2,3)
 		 * op3   sh(3,0)	 sh(3,1)	   sh(3,2)		sh(3,3)
 		 *
-		 * we will compute the state handles column by column.
-		 *
 		 */
+		assignTaskStateToExecutionJobVertices(
+			executionJobVertex,
+			newManagedOperatorStates,
+			newRawOperatorStates,
+			newManagedKeyedState,
+			newRawKeyedState,
+			newParallelism);
+	}
+
+	private void assignTaskStateToExecutionJobVertices(
+			ExecutionJobVertex executionJobVertex,
+			Multimap<OperatorInstanceID, OperatorStateHandle> subManagedOperatorState,
+			Multimap<OperatorInstanceID, OperatorStateHandle> subRawOperatorState,
+			Multimap<OperatorInstanceID, KeyedStateHandle> subManagedKeyedState,
+			Multimap<OperatorInstanceID, KeyedStateHandle> subRawKeyedState,
+			int newParallelism) {
+
+		List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs();
+
 		for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {
 
 			Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[subTaskIndex]
 				.getCurrentExecutionAttempt();
 
-			Tuple2<Collection<KeyedStateHandle>, Collection<KeyedStateHandle>> subKeyedState = null;
-
-			List<Collection<OperatorStateHandle>> subManagedOperatorState = new ArrayList<>();
-			List<Collection<OperatorStateHandle>> subRawOperatorState = new ArrayList<>();
-
+			TaskStateSnapshot taskState = new TaskStateSnapshot();
+			boolean statelessTask = true;
 
-			for (int operatorIndex = 0; operatorIndex < operatorIDs.size(); operatorIndex++) {
-				OperatorState operatorState = operatorStates.get(operatorIndex);
-				int oldParallelism = operatorState.getParallelism();
+			for (OperatorID operatorID : operatorIDs) {
+				OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, operatorID);
 
-				// PartitionedState
-				reAssignSubPartitionableState(
-					newManagedOperatorStates,
-					newRawOperatorStates,
-					subTaskIndex,
-					operatorIndex,
+				OperatorSubtaskState operatorSubtaskState = operatorSubtaskStateFrom(
+					instanceID,
 					subManagedOperatorState,
-					subRawOperatorState);
+					subRawOperatorState,
+					subManagedKeyedState,
+					subRawKeyedState);
 
-				// KeyedState
-				if (isHeadOperator(operatorIndex, operatorIDs)) {
-					subKeyedState = reAssignSubKeyedStates(
-						operatorState,
-						keyGroupPartitions,
-						subTaskIndex,
-						newParallelism,
-						oldParallelism);
+				if (operatorSubtaskState.hasState()) {
+					statelessTask = false;
 				}
+				taskState.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
 			}
 
-			// check if a stateless task
-			if (!allElementsAreNull(subManagedOperatorState) ||
-				!allElementsAreNull(subRawOperatorState) ||
-				subKeyedState != null) {
-
-				TaskStateSnapshot taskState = new TaskStateSnapshot();
-
-				for (int i = 0; i < operatorIDs.size(); ++i) {
-
-					OperatorID operatorID = operatorIDs.get(i);
-
-					Collection<KeyedStateHandle> rawKeyed = Collections.emptyList();
-					Collection<KeyedStateHandle> managedKeyed = Collections.emptyList();
-
-					// keyed state case
-					if (subKeyedState != null) {
-						managedKeyed = subKeyedState.f0;
-						rawKeyed = subKeyedState.f1;
-					}
-
-					OperatorSubtaskState operatorSubtaskState =
-						new OperatorSubtaskState(
-							subManagedOperatorState.get(i),
-							subRawOperatorState.get(i),
-							managedKeyed,
-							rawKeyed
-						);
-
-					taskState.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
-				}
-
+			if (!statelessTask) {
 				currentExecutionAttempt.setInitialState(taskState);
 			}
 		}
 	}
 
+	private static OperatorSubtaskState operatorSubtaskStateFrom(
+			OperatorInstanceID instanceID,
+			Multimap<OperatorInstanceID, OperatorStateHandle> subManagedOperatorState,
+			Multimap<OperatorInstanceID, OperatorStateHandle> subRawOperatorState,
+			Multimap<OperatorInstanceID, KeyedStateHandle> subManagedKeyedState,
+			Multimap<OperatorInstanceID, KeyedStateHandle> subRawKeyedState) {
+
+		if (!subManagedOperatorState.containsKey(instanceID) &&
+			!subRawOperatorState.containsKey(instanceID) &&
+			!subManagedKeyedState.containsKey(instanceID) &&
+			!subRawKeyedState.containsKey(instanceID)) {
+			
+			return new OperatorSubtaskState();
+		}
+		if (!subManagedKeyedState.containsKey(instanceID)) {
+			checkState(!subRawKeyedState.containsKey(instanceID));
+		}
+		return new OperatorSubtaskState(
+			subManagedOperatorState.get(instanceID),
+			subRawOperatorState.get(instanceID),
+			subManagedKeyedState.get(instanceID),
+			subRawKeyedState.get(instanceID));
+	}
+
 	private static boolean isHeadOperator(int opIdx, List<OperatorID> operatorIDs) {
 		return opIdx == operatorIDs.size() - 1;
 	}
 
 	public void checkParallelismPreconditions(List<OperatorState> operatorStates, ExecutionJobVertex executionJobVertex) {
-
 		for (OperatorState operatorState : operatorStates) {
 			checkParallelismPreconditions(operatorState, executionJobVertex);
 		}
 	}
 
-
-	private void reAssignSubPartitionableState(
-			List<List<Collection<OperatorStateHandle>>> newMangedOperatorStates,
-			List<List<Collection<OperatorStateHandle>>> newRawOperatorStates,
-			int subTaskIndex, int operatorIndex,
-			List<Collection<OperatorStateHandle>> subManagedOperatorState,
-			List<Collection<OperatorStateHandle>> subRawOperatorState) {
-
-		if (newMangedOperatorStates.get(operatorIndex) != null && !newMangedOperatorStates.get(operatorIndex).isEmpty()) {
-			Collection<OperatorStateHandle> operatorStateHandles = newMangedOperatorStates.get(operatorIndex).get(subTaskIndex);
-			subManagedOperatorState.add(operatorStateHandles != null ? operatorStateHandles : Collections.<OperatorStateHandle>emptyList());
-		} else {
-			subManagedOperatorState.add(Collections.<OperatorStateHandle>emptyList());
-		}
-		if (newRawOperatorStates.get(operatorIndex) != null && !newRawOperatorStates.get(operatorIndex).isEmpty()) {
-			Collection<OperatorStateHandle> operatorStateHandles = newRawOperatorStates.get(operatorIndex).get(subTaskIndex);
-			subRawOperatorState.add(operatorStateHandles != null ? operatorStateHandles : Collections.<OperatorStateHandle>emptyList());
-		} else {
-			subRawOperatorState.add(Collections.<OperatorStateHandle>emptyList());
+	private void reDistributeKeyedStates(
+			List<OperatorState> oldOperatorStates,
+			int newParallelism,
+			List<OperatorID> newOperatorIDs,
+			List<KeyGroupRange> newKeyGroupPartitions,
+			Multimap<OperatorInstanceID, KeyedStateHandle> newManagedKeyedState,
+			Multimap<OperatorInstanceID, KeyedStateHandle> newRawKeyedState) {
+		//TODO: rewrite this method to only use OperatorID
+		checkState(newOperatorIDs.size() == oldOperatorStates.size(),
+			"This method still depends on the order of the new and old operators");
+
+		for (int operatorIndex = 0; operatorIndex < newOperatorIDs.size(); operatorIndex++) {
+			OperatorState operatorState = oldOperatorStates.get(operatorIndex);
+			int oldParallelism = operatorState.getParallelism();
+
+			for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {
+				OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex));
+				if (isHeadOperator(operatorIndex, newOperatorIDs)) {
+					Tuple2<Collection<KeyedStateHandle>, Collection<KeyedStateHandle>> subKeyedStates = reAssignSubKeyedStates(
+						operatorState,
+						newKeyGroupPartitions,
+						subTaskIndex,
+						newParallelism,
+						oldParallelism);
+					newManagedKeyedState.putAll(instanceID, subKeyedStates.f0);
+					newRawKeyedState.putAll(instanceID, subKeyedStates.f1);
+				}
+			}
 		}
 	}
 
+	// TODO rewrite based on operator id
 	private Tuple2<Collection<KeyedStateHandle>, Collection<KeyedStateHandle>> reAssignSubKeyedStates(
 			OperatorState operatorState,
 			List<KeyGroupRange> keyGroupPartitions,
@@ -284,48 +312,50 @@ public class StateAssignmentOperation {
 		}
 
 		if (subManagedKeyedState.isEmpty() && subRawKeyedState.isEmpty()) {
-			return null;
+			return new Tuple2<>(Collections.emptyList(), Collections.emptyList());
 		} else {
 			return new Tuple2<>(subManagedKeyedState, subRawKeyedState);
 		}
 	}
 
-
-	private <X> boolean allElementsAreNull(List<X> nonPartitionableStates) {
-		for (Object streamStateHandle : nonPartitionableStates) {
-			if (streamStateHandle != null) {
-				return false;
-			}
-		}
-		return true;
-	}
-
 	private void reDistributePartitionableStates(
-			List<OperatorState> operatorStates, int newParallelism,
-			List<List<Collection<OperatorStateHandle>>> newManagedOperatorStates,
-			List<List<Collection<OperatorStateHandle>>> newRawOperatorStates) {
+			List<OperatorState> oldOperatorStates,
+			int newParallelism,
+			List<OperatorID> newOperatorIDs,
+			Multimap<OperatorInstanceID, OperatorStateHandle> newManagedOperatorStates,
+			Multimap<OperatorInstanceID, OperatorStateHandle> newRawOperatorStates) {
+
+		//TODO: rewrite this method to only use OperatorID
+		checkState(newOperatorIDs.size() == oldOperatorStates.size(),
+			"This method still depends on the order of the new and old operators");
 
 		//collect the old partitionable state
 		List<List<OperatorStateHandle>> oldManagedOperatorStates = new ArrayList<>();
 		List<List<OperatorStateHandle>> oldRawOperatorStates = new ArrayList<>();
 
-		collectPartionableStates(operatorStates, oldManagedOperatorStates, oldRawOperatorStates);
-
+		collectPartionableStates(oldOperatorStates, oldManagedOperatorStates, oldRawOperatorStates);
 
 		//redistribute
 		OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
 
-		for (int operatorIndex = 0; operatorIndex < operatorStates.size(); operatorIndex++) {
-			int oldParallelism = operatorStates.get(operatorIndex).getParallelism();
-			newManagedOperatorStates.add(applyRepartitioner(opStateRepartitioner,
-				oldManagedOperatorStates.get(operatorIndex), oldParallelism, newParallelism));
-			newRawOperatorStates.add(applyRepartitioner(opStateRepartitioner,
-				oldRawOperatorStates.get(operatorIndex), oldParallelism, newParallelism));
-
+		for (int operatorIndex = 0; operatorIndex < oldOperatorStates.size(); operatorIndex++) {
+			OperatorID operatorID = newOperatorIDs.get(operatorIndex);
+			int oldParallelism = oldOperatorStates.get(operatorIndex).getParallelism();
+			newManagedOperatorStates.putAll(applyRepartitioner(
+				operatorID,
+				opStateRepartitioner,
+				oldManagedOperatorStates.get(operatorIndex),
+				oldParallelism,
+				newParallelism));
+			newRawOperatorStates.putAll(applyRepartitioner(
+				operatorID,
+				opStateRepartitioner,
+				oldRawOperatorStates.get(operatorIndex),
+				oldParallelism,
+				newParallelism));
 		}
 	}
 
-
 	private void collectPartionableStates(
 			List<OperatorState> operatorStates,
 			List<List<OperatorStateHandle>> managedOperatorStates,
@@ -356,7 +386,6 @@ public class StateAssignmentOperation {
 		}
 	}
 
-
 	/**
 	 * Collect {@link KeyGroupsStateHandle  managedKeyedStateHandles} which have intersection with given
 	 * {@link KeyGroupRange} from {@link TaskState operatorState}
@@ -524,6 +553,28 @@ public class StateAssignmentOperation {
 		}
 	}
 
+	public static Multimap<OperatorInstanceID, OperatorStateHandle> applyRepartitioner(
+			OperatorID operatorID,
+			OperatorStateRepartitioner opStateRepartitioner,
+			List<OperatorStateHandle> chainOpParallelStates,
+			int oldParallelism,
+			int newParallelism) {
+		Multimap<OperatorInstanceID, OperatorStateHandle> result = ArrayListMultimap.create();
+
+		List<Collection<OperatorStateHandle>> states = applyRepartitioner(
+			opStateRepartitioner,
+			chainOpParallelStates,
+			oldParallelism,
+			newParallelism);
+
+		for (int subtaskIndex = 0; subtaskIndex < states.size(); subtaskIndex++) {
+			checkNotNull(states.get(subtaskIndex) != null, "states.get(subtaskIndex) is null");
+			result.putAll(OperatorInstanceID.of(subtaskIndex, operatorID), states.get(subtaskIndex));
+		}
+
+		return result;
+	}
+
 	/**
 	 * Repartitions the given operator state using the given {@link OperatorStateRepartitioner} with respect to the new
 	 * parallelism.
@@ -534,6 +585,7 @@ public class StateAssignmentOperation {
 	 * @param newParallelism        parallelism with which the state should be partitioned
 	 * @return repartitioned state
 	 */
+	// TODO rewrite based on operator id
 	public static List<Collection<OperatorStateHandle>> applyRepartitioner(
 			OperatorStateRepartitioner opStateRepartitioner,
 			List<OperatorStateHandle> chainOpParallelStates,

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b2b83d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorInstanceID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorInstanceID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorInstanceID.java
new file mode 100644
index 0000000..76bcdbf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorInstanceID.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import java.util.Objects;
+
+/**
+ * An ID for physical instance of the operator.
+ */
+public class OperatorInstanceID  {
+
+	private final int subtaskId;
+	private final OperatorID operatorId;
+
+	public static OperatorInstanceID of(int subtaskId, OperatorID operatorID) {
+		return new OperatorInstanceID(subtaskId, operatorID);
+	}
+
+	public OperatorInstanceID(int subtaskId, OperatorID operatorId) {
+		this.subtaskId = subtaskId;
+		this.operatorId = operatorId;
+	}
+
+	public int getSubtaskId() {
+		return subtaskId;
+	}
+
+	public OperatorID getOperatorId() {
+		return operatorId;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(subtaskId, operatorId);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		if (obj == null) {
+			return false;
+		}
+		if (!(obj instanceof OperatorInstanceID)) {
+			return false;
+		}
+		OperatorInstanceID other = (OperatorInstanceID) obj;
+		return this.subtaskId == other.subtaskId &&
+			Objects.equals(this.operatorId, other.operatorId);
+	}
+
+	@Override
+	public String toString() {
+		return String.format("<%d, %s>", subtaskId, operatorId);
+	}
+}