You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:21 UTC
[14/50] [abbrv] flink git commit: [FLINK-5892] Add new
StateAssignmentOperationV2
[FLINK-5892] Add new StateAssignmentOperationV2
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8045faba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8045faba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8045faba
Branch: refs/heads/table-retraction
Commit: 8045fabac736cc8c6b48fda8328cf91f329dc3bf
Parents: 591841f
Author: guowei.mgw <gu...@gmail.com>
Authored: Mon Apr 24 11:47:47 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 28 20:09:11 2017 +0200
----------------------------------------------------------------------
.../checkpoint/StateAssignmentOperationV2.java | 458 +++++++++++++++++++
1 file changed, 458 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8045faba/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationV2.java
new file mode 100644
index 0000000..83c188c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationV2.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.checkpoint;
+
+//import com.google.common.collect.Lists;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates the operation of assigning restored state when restoring from a checkpoint that works on the
+ * granularity of operators. This is the case for checkpoints that were created either with a Flink version >= 1.3 or
+ * 1.2 if the savepoint only contains {@link SubtaskState}s for which the length of contained
+ * {@link ChainedStateHandle}s is equal to 1.
+ */
+public class StateAssignmentOperationV2 {
+
+ private final Logger logger;
+ private final Map<JobVertexID, ExecutionJobVertex> tasks;
+ private final Map<JobVertexID, TaskState> taskStates;
+ private final boolean allowNonRestoredState;
+
+ public StateAssignmentOperationV2(
+ Logger logger,
+ Map<JobVertexID, ExecutionJobVertex> tasks,
+ Map<JobVertexID, TaskState> taskStates,
+ boolean allowNonRestoredState) {
+
+ this.logger = Preconditions.checkNotNull(logger);
+ this.tasks = Preconditions.checkNotNull(tasks);
+ this.taskStates = Preconditions.checkNotNull(taskStates);
+ this.allowNonRestoredState = allowNonRestoredState;
+ }
+
+ public boolean assignStates() throws Exception {
+ Map<JobVertexID, TaskState> localStates = new HashMap<>(taskStates);
+ Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
+
+ Set<JobVertexID> allOperatorIDs = new HashSet<>();
+ for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+ //allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
+ }
+ for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) {
+ TaskState taskState = taskGroupStateEntry.getValue();
+ //----------------------------------------find operator for state---------------------------------------------
+
+ if (!allOperatorIDs.contains(taskGroupStateEntry.getKey())) {
+ if (allowNonRestoredState) {
+ logger.info("Skipped checkpoint state for operator {}.", taskState.getJobVertexID());
+ continue;
+ } else {
+ throw new IllegalStateException("There is no operator for the state " + taskState.getJobVertexID());
+ }
+ }
+ }
+
+ for (Map.Entry<JobVertexID, ExecutionJobVertex> task : localTasks.entrySet()) {
+ final ExecutionJobVertex executionJobVertex = task.getValue();
+
+ // find the states of all operators belonging to this task
+ JobVertexID[] operatorIDs = null;//executionJobVertex.getOperatorIDs();
+ JobVertexID[] altOperatorIDs = null;//executionJobVertex.getUserDefinedOperatorIDs();
+ List<TaskState> operatorStates = new ArrayList<>();
+ boolean statelessTask = true;
+ for (int x = 0; x < operatorIDs.length; x++) {
+ JobVertexID operatorID = altOperatorIDs[x] == null
+ ? operatorIDs[x]
+ : altOperatorIDs[x];
+
+ TaskState operatorState = localStates.remove(operatorID);
+ if (operatorState == null) {
+ operatorState = new TaskState(
+ operatorID,
+ executionJobVertex.getParallelism(),
+ executionJobVertex.getMaxParallelism(),
+ 1);
+ } else {
+ statelessTask = false;
+ }
+ operatorStates.add(operatorState);
+ }
+ if (statelessTask) { // skip tasks where no operator has any state
+ continue;
+ }
+
+ assignAttemptState(task.getValue(), operatorStates);
+ }
+
+ return true;
+ }
+
+ private void assignAttemptState(ExecutionJobVertex executionJobVertex, List<TaskState> operatorStates) {
+
+ JobVertexID[] operatorIDs = null;//executionJobVertex.getOperatorIDs();
+
+ //1. first compute the new parallelism
+ checkParallelismPreconditions(operatorStates, executionJobVertex);
+
+ int newParallelism = executionJobVertex.getParallelism();
+
+ List<KeyGroupRange> keyGroupPartitions = null;//StateAssignmentOperationUtils.createKeyGroupPartitions(
+ //executionJobVertex.getMaxParallelism(),
+ //newParallelism);
+
+ //2. Redistribute the operator state.
+ /**
+ *
+ * Redistribute ManagedOperatorStates and RawOperatorStates from old parallelism to new parallelism.
+ *
+ * The old ManagedOperatorStates with old parallelism 3:
+ *
+ * parallelism0 parallelism1 parallelism2
+ * op0 states0,0 state0,1 state0,2
+ * op1
+ * op2 states2,0 state2,1 state1,2
+ * op3 states3,0 state3,1 state3,2
+ *
+ * The new ManagedOperatorStates with new parallelism 4:
+ *
+ * parallelism0 parallelism1 parallelism2 parallelism3
+ * op0 state0,0 state0,1 state0,2 state0,3
+ * op1
+ * op2 state2,0 state2,1 state2,2 state2,3
+ * op3 state3,0 state3,1 state3,2 state3,3
+ */
+ List<List<Collection<OperatorStateHandle>>> newManagedOperatorStates = new ArrayList<>();
+ List<List<Collection<OperatorStateHandle>>> newRawOperatorStates = new ArrayList<>();
+
+ reDistributePartitionableStates(operatorStates, newParallelism, newManagedOperatorStates, newRawOperatorStates);
+
+
+ //3. Compute TaskStateHandles of every subTask in the executionJobVertex
+ /**
+ * An executionJobVertex's all state handles needed to restore are something like a matrix
+ *
+ * parallelism0 parallelism1 parallelism2 parallelism3
+ * op0 sh(0,0) sh(0,1) sh(0,2) sh(0,3)
+ * op1 sh(1,0) sh(1,1) sh(1,2) sh(1,3)
+ * op2 sh(2,0) sh(2,1) sh(2,2) sh(2,3)
+ * op3 sh(3,0) sh(3,1) sh(3,2) sh(3,3)
+ *
+ * we will compute the state handles column by column.
+ *
+ */
+ for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {
+
+ Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[subTaskIndex]
+ .getCurrentExecutionAttempt();
+
+ List<StreamStateHandle> subNonPartitionableState = new ArrayList<>();
+
+ Tuple2<Collection<KeyedStateHandle>, Collection<KeyedStateHandle>> subKeyedState = null;
+
+ List<Collection<OperatorStateHandle>> subManagedOperatorState = new ArrayList<>();
+ List<Collection<OperatorStateHandle>> subRawOperatorState = new ArrayList<>();
+
+
+ for (int operatorIndex = 0; operatorIndex < operatorIDs.length; operatorIndex++) {
+ TaskState operatorState = operatorStates.get(operatorIndex);
+ int oldParallelism = operatorState.getParallelism();
+
+ // NonPartitioned State
+
+ reAssignSubNonPartitionedStates(
+ operatorState,
+ subTaskIndex,
+ newParallelism,
+ oldParallelism,
+ subNonPartitionableState);
+
+ // PartitionedState
+ reAssignSubPartitionableState(newManagedOperatorStates,
+ newRawOperatorStates,
+ subTaskIndex,
+ operatorIndex,
+ subManagedOperatorState,
+ subRawOperatorState);
+
+ // KeyedState
+ if (operatorIndex == operatorIDs.length - 1) {
+ subKeyedState = reAssignSubKeyedStates(operatorState,
+ keyGroupPartitions,
+ subTaskIndex,
+ newParallelism,
+ oldParallelism);
+
+ }
+ }
+
+
+ // check if a stateless task
+ if (!allElementsAreNull(subNonPartitionableState) ||
+ !allElementsAreNull(subManagedOperatorState) ||
+ !allElementsAreNull(subRawOperatorState) ||
+ subKeyedState != null) {
+
+ TaskStateHandles taskStateHandles = new TaskStateHandles(
+
+ new ChainedStateHandle<>(subNonPartitionableState),
+ subManagedOperatorState,
+ subRawOperatorState,
+ subKeyedState != null ? subKeyedState.f0 : null,
+ subKeyedState != null ? subKeyedState.f1 : null);
+
+ currentExecutionAttempt.setInitialState(taskStateHandles);
+ }
+ }
+ }
+
+
+ public void checkParallelismPreconditions(List<TaskState> operatorStates, ExecutionJobVertex executionJobVertex) {
+
+ for (TaskState taskState : operatorStates) {
+ //StateAssignmentOperation.checkParallelismPreconditions(taskState, executionJobVertex, this.logger);
+ }
+ }
+
+
+ private void reAssignSubPartitionableState(
+ List<List<Collection<OperatorStateHandle>>> newMangedOperatorStates,
+ List<List<Collection<OperatorStateHandle>>> newRawOperatorStates,
+ int subTaskIndex, int operatorIndex,
+ List<Collection<OperatorStateHandle>> subManagedOperatorState,
+ List<Collection<OperatorStateHandle>> subRawOperatorState) {
+
+ if (newMangedOperatorStates.get(operatorIndex) != null) {
+ subManagedOperatorState.add(newMangedOperatorStates.get(operatorIndex).get(subTaskIndex));
+ } else {
+ subManagedOperatorState.add(null);
+ }
+ if (newRawOperatorStates.get(operatorIndex) != null) {
+ subRawOperatorState.add(newRawOperatorStates.get(operatorIndex).get(subTaskIndex));
+ } else {
+ subRawOperatorState.add(null);
+ }
+
+
+ }
+
+ private Tuple2<Collection<KeyedStateHandle>, Collection<KeyedStateHandle>> reAssignSubKeyedStates(
+ TaskState operatorState,
+ List<KeyGroupRange> keyGroupPartitions,
+ int subTaskIndex,
+ int newParallelism,
+ int oldParallelism) {
+
+ Collection<KeyedStateHandle> subManagedKeyedState;
+ Collection<KeyedStateHandle> subRawKeyedState;
+
+ if (newParallelism == oldParallelism) {
+ if (operatorState.getState(subTaskIndex) != null) {
+ KeyedStateHandle oldSubManagedKeyedState = operatorState.getState(subTaskIndex).getManagedKeyedState();
+ KeyedStateHandle oldSubRawKeyedState = operatorState.getState(subTaskIndex).getRawKeyedState();
+ subManagedKeyedState = oldSubManagedKeyedState != null ? Collections.singletonList(
+ oldSubManagedKeyedState) : null;
+ subRawKeyedState = oldSubRawKeyedState != null ? Collections.singletonList(
+ oldSubRawKeyedState) : null;
+ } else {
+ subManagedKeyedState = null;
+ subRawKeyedState = null;
+ }
+ } else {
+ subManagedKeyedState = getManagedKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex));
+ subRawKeyedState = getRawKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex));
+ }
+ if (subManagedKeyedState == null && subRawKeyedState == null) {
+ return null;
+ }
+ return new Tuple2<>(subManagedKeyedState, subRawKeyedState);
+ }
+
+
+ private <X> boolean allElementsAreNull(List<X> nonPartitionableStates) {
+ for (Object streamStateHandle : nonPartitionableStates) {
+ if (streamStateHandle != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ private void reAssignSubNonPartitionedStates(
+ TaskState operatorState,
+ int subTaskIndex,
+ int newParallelism,
+ int oldParallelism,
+ List<StreamStateHandle> subNonPartitionableState) {
+ if (oldParallelism == newParallelism) {
+ if (operatorState.getState(subTaskIndex) != null &&
+ !operatorState.getState(subTaskIndex).getLegacyOperatorState().isEmpty()) {
+ subNonPartitionableState.add(operatorState.getState(subTaskIndex).getLegacyOperatorState().get(0));
+ } else {
+ subNonPartitionableState.add(null);
+ }
+ } else {
+ subNonPartitionableState.add(null);
+ }
+ }
+
+ private void reDistributePartitionableStates(
+ List<TaskState> operatorStates, int newParallelism,
+ List<List<Collection<OperatorStateHandle>>> newManagedOperatorStates,
+ List<List<Collection<OperatorStateHandle>>> newRawOperatorStates) {
+
+ //collect the old partitionalbe state
+ List<List<OperatorStateHandle>> oldManagedOperatorStates = new ArrayList<>();
+ List<List<OperatorStateHandle>> oldRawOperatorStates = new ArrayList<>();
+
+ collectPartionableStates(operatorStates, oldManagedOperatorStates, oldRawOperatorStates);
+
+
+ //redistribute
+ OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
+
+ for (int operatorIndex = 0; operatorIndex < operatorStates.size(); operatorIndex++) {
+ int oldParallelism = operatorStates.get(operatorIndex).getParallelism();
+ //newManagedOperatorStates.add(StateAssignmentOperationUtils.applyRepartitioner(opStateRepartitioner,
+ // oldManagedOperatorStates.get(operatorIndex), oldParallelism, newParallelism));
+ //newRawOperatorStates.add(StateAssignmentOperationUtils.applyRepartitioner(opStateRepartitioner,
+ // oldRawOperatorStates.get(operatorIndex), oldParallelism, newParallelism));
+
+ }
+ }
+
+
+ private void collectPartionableStates(
+ List<TaskState> operatorStates,
+ List<List<OperatorStateHandle>> managedOperatorStates,
+ List<List<OperatorStateHandle>> rawOperatorStates) {
+
+ for (TaskState operatorState : operatorStates) {
+ List<OperatorStateHandle> managedOperatorState = null;
+ List<OperatorStateHandle> rawOperatorState = null;
+
+ for (int i = 0; i < operatorState.getParallelism(); i++) {
+ SubtaskState subtaskState = operatorState.getState(i);
+ if (subtaskState != null) {
+ if (subtaskState.getManagedOperatorState() != null &&
+ subtaskState.getManagedOperatorState().getLength() > 0 &&
+ subtaskState.getManagedOperatorState().get(0) != null) {
+ if (managedOperatorState == null) {
+ managedOperatorState = new ArrayList<>();
+ }
+ managedOperatorState.add(subtaskState.getManagedOperatorState().get(0));
+ }
+
+ if (subtaskState.getRawOperatorState() != null &&
+ subtaskState.getRawOperatorState().getLength() > 0 &&
+ subtaskState.getRawOperatorState().get(0) != null) {
+ if (rawOperatorState == null) {
+ rawOperatorState = new ArrayList<>();
+ }
+ rawOperatorState.add(subtaskState.getRawOperatorState().get(0));
+ }
+ }
+
+ }
+ managedOperatorStates.add(managedOperatorState);
+ rawOperatorStates.add(rawOperatorState);
+ }
+ }
+
+
+ /**
+ * Collect {@link KeyGroupsStateHandle managedKeyedStateHandles} which have intersection with given
+ * {@link KeyGroupRange} from {@link TaskState operatorState}
+ *
+ * @param operatorState all state handles of a operator
+ * @param subtaskKeyGroupRange the KeyGroupRange of a subtask
+ * @return all managedKeyedStateHandles which have intersection with given KeyGroupRange
+ */
+ public static List<KeyedStateHandle> getManagedKeyedStateHandles(
+ TaskState operatorState,
+ KeyGroupRange subtaskKeyGroupRange) {
+
+ List<KeyedStateHandle> subtaskKeyedStateHandles = null;
+
+ for (int i = 0; i < operatorState.getParallelism(); i++) {
+ if (operatorState.getState(i) != null && operatorState.getState(i).getManagedKeyedState() != null) {
+ KeyedStateHandle intersectedKeyedStateHandle = operatorState.getState(i).getManagedKeyedState().getIntersection(subtaskKeyGroupRange);
+
+ if (intersectedKeyedStateHandle != null) {
+ if (subtaskKeyedStateHandles == null) {
+ subtaskKeyedStateHandles = new ArrayList<>();
+ }
+ subtaskKeyedStateHandles.add(intersectedKeyedStateHandle);
+ }
+ }
+ }
+
+ return subtaskKeyedStateHandles;
+ }
+
+ /**
+ * Collect {@link KeyGroupsStateHandle rawKeyedStateHandles} which have intersection with given
+ * {@link KeyGroupRange} from {@link TaskState operatorState}
+ *
+ * @param operatorState all state handles of a operator
+ * @param subtaskKeyGroupRange the KeyGroupRange of a subtask
+ * @return all rawKeyedStateHandles which have intersection with given KeyGroupRange
+ */
+ public static List<KeyedStateHandle> getRawKeyedStateHandles(
+ TaskState operatorState,
+ KeyGroupRange subtaskKeyGroupRange) {
+
+ List<KeyedStateHandle> subtaskKeyedStateHandles = null;
+
+ for (int i = 0; i < operatorState.getParallelism(); i++) {
+ if (operatorState.getState(i) != null && operatorState.getState(i).getRawKeyedState() != null) {
+ KeyedStateHandle intersectedKeyedStateHandle = operatorState.getState(i).getRawKeyedState().getIntersection(subtaskKeyGroupRange);
+
+ if (intersectedKeyedStateHandle != null) {
+ if (subtaskKeyedStateHandles == null) {
+ subtaskKeyedStateHandles = new ArrayList<>();
+ }
+ subtaskKeyedStateHandles.add(intersectedKeyedStateHandle);
+ }
+ }
+ }
+
+ return subtaskKeyedStateHandles;
+ }
+}