You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2017/07/17 13:08:13 UTC

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/4353

    [FLINK-7213] Introduce state management by OperatorID in TaskManager

    Flink-5892 introduced the job manager / checkpoint coordinator part of managing state on the operator level instead of the task level by introducing explicit operator_id -> state mappings. 
    
    However, this explicit mapping was not introduced in the task manager side, so the explicit mapping is still converted into a mapping that suits the implicit operator chain order.
    
    This PR introduces this part and offers explicit state management by operator_id in the task manager.
    
    Furthermore, this PR also introduces `TaskStateSnapshot` as unify abstraction to replace `TaskStateHandles` and `SubtaskState`which were always very similar, except that one offered collections of state handles (to support scaling in on restore) while the other only contained single objects (because each state is snapshotted into one state handle).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink tmpBU

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4353
    
----
commit d68dd39b343595120f62fc8280b2f3c5f0ee7503
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-06-26T16:07:59Z

    [FLINK-7213] Introduce state management by OperatorID in TaskManager

commit 11cdd85668aa18f8e5bab0e6cac9ba082bfea95c
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-07-11T15:10:03Z

    [FLINK-7213] Introduce TaskStateSnapshot to unify TaskStateHandles and SubtaskState

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4353
  
    I think that idea is problematic because in the rescaling case, all the collections can have different sizes. For example there can be 5 managed keyed state handles and 7 managed operator state handles and zero state handles for the raw state. Then how would you split that up between the `OperatorSubtaskStates` in your set? Like this, `OperatorSubtaskState` contains the complete state for an operator subtask which I think is a good thing. Also maybe at some point there *might* be a reason to report more than one handle already on snapshotting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4353
  
    Concerning the suggestion about the `MultiStreamStateHandle` - I am not sure that this can always work. Different physical files may have headers, so it may be important to recognize them as different chunks of state in the general case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133016663
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---
    @@ -75,31 +103,79 @@
     	 */
     	private final long stateSize;
     
    +	@VisibleForTesting
    +	public OperatorSubtaskState(StreamStateHandle legacyOperatorState) {
    +
    +		this(legacyOperatorState,
    +			Collections.<OperatorStateHandle>emptyList(),
    +			Collections.<OperatorStateHandle>emptyList(),
    +			Collections.<KeyedStateHandle>emptyList(),
    +			Collections.<KeyedStateHandle>emptyList());
    +	}
    +
    +	/**
    +	 * Empty state.
    +	 */
    +	public OperatorSubtaskState() {
    --- End diff --
    
    Minor optimization: One could make this constructor `private` and have a field `OperatorSubtaskState.EMPTY` as a placeholder for the empty states. I'd leave this to you whether you think it worth doing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129019373
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---
    @@ -164,8 +168,16 @@ public void acknowledgeCheckpoint(
     					throw new RuntimeException(e);
     				}
     
    +				boolean hasKeyedManagedKeyedState = false;
    --- End diff --
    
    -> `hasManagedKeyedState`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129020085
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---
    @@ -164,6 +269,7 @@ public long getStateSize() {
     
     	// --------------------------------------------------------------------------------------------
     
    +
    --- End diff --
    
    remove this empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter closed the pull request at:

    https://github.com/apache/flink/pull/4353


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129272254
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task
    + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can
    + * register their state under their operator id. Each operator instance is a physical execution responsible for
    + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize
    + * execution of logical operators, e.g. distributing a map function.
    + * <p>One instance of this class contains the information that one task will send to acknowledge a checkpoint request by t
    + * he checkpoint coordinator. Tasks run operator instances in parallel, so the union of all
    + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole
    + * state of a job at the time of the checkpoint.
    + * <p>This class should be called TaskState once the old class with this name that we keep for backwards
    --- End diff --
    
    add empty line before paragraph


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129274620
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---
    @@ -3631,16 +3673,16 @@ public void testSavepointsAreNotAddedToCompletedCheckpointStore() throws Excepti
     			completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == checkpointIDCounter.getLast());
     	}
     
    -	private static final class SpyInjectingOperatorState extends OperatorState {
    -
    -		private static final long serialVersionUID = -4004437428483663815L;
    -
    -		public SpyInjectingOperatorState(OperatorID taskID, int parallelism, int maxParallelism) {
    -			super(taskID, parallelism, maxParallelism);
    -		}
    -
    -		public void putState(int subtaskIndex, OperatorSubtaskState subtaskState) {
    -			super.putState(subtaskIndex, spy(subtaskState));
    -		}
    -	}
    +//	private static final class SpyInjectingOperatorState extends OperatorState {
    --- End diff --
    
    you can remove this (which is _really_ great...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133010887
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task
    + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can
    + * register their state under their operator id. Each operator instance is a physical execution responsible for
    + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize
    + * execution of logical operators, e.g. distributing a map function.
    + *
    + * <p>One instance of this class contains the information that one task will send to acknowledge a checkpoint request by
    + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all
    + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole
    + * state of a job at the time of the checkpoint.
    + *
    + * <p>This class should be called TaskState once the old class with this name that we keep for backwards
    + * compatibility goes away.
    + */
    +public class TaskStateSnapshot implements CompositeStateHandle {
    --- End diff --
    
    Would it make sense to make this immutable? It looks like this should not be modified any more after fully constructing it. This would also make it clear that methods iterating over the state, or returning sets / iterables can never fail with concurrent modifications.
    
    For example the `size` method is considered a "best effort" method for info purposes only, and should not fail with an exception (it currently could fail with a `ConcurrentModificationException`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133022095
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---
    @@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() {
     			assertFalse(checkpoint.isDiscarded());
     			assertFalse(checkpoint.isFullyAcknowledged());
     
    -			OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
    -			OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
    -
    -			Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates();
    -
    -			operatorStates.put(opID1, new SpyInjectingOperatorState(
    -				opID1, vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism()));
    -			operatorStates.put(opID2, new SpyInjectingOperatorState(
    -				opID2, vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism()));
    -
     			// check that the vertices received the trigger checkpoint message
     			{
     				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
     				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
     			}
     
    +			OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
    +			OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
    +			TaskStateSnapshot taskOperatorSubtaskStates1 = mock(TaskStateSnapshot.class);
    --- End diff --
    
    Why not create a proper `TaskStateSnapshot` with one entry, rather than mocking?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129269706
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java ---
    @@ -89,6 +89,10 @@ private GroupByStateNameResults groupByStateName(
     
     		for (OperatorStateHandle psh : previousParallelSubtaskStates) {
     
    +			if(psh == null) {
    --- End diff --
    
    missing spacer after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129266693
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---
    @@ -18,20 +18,40 @@
     
     package org.apache.flink.runtime.checkpoint;
     
    +import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.runtime.state.CompositeStateHandle;
     import org.apache.flink.runtime.state.KeyedStateHandle;
     import org.apache.flink.runtime.state.OperatorStateHandle;
     import org.apache.flink.runtime.state.SharedStateRegistry;
     import org.apache.flink.runtime.state.StateObject;
     import org.apache.flink.runtime.state.StateUtil;
     import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Arrays;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
     
     /**
    - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}.
    + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical)
    + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all
    + * parallel tasks that physically execute parallelized, physical instances of the operator.
    + * <p>The full state of the logical operator is represented by {@link OperatorState} which consists of
    + * {@link OperatorSubtaskState}s.
    + * <p>Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle
    + * produced per state type (e.g. managed-keyed, raw-operator, ...). In particular, this holds when taking a snapshot.
    + * The purpose of having the state handles in collections is that this class is also reused in restoring state.
    + * Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In
    --- End diff --
    
    In the master, we used two different classes for this purpose: `OperatorSubtaskState` to report from task to master, and `TaskStateHandles` to restore from master to task. Their difference is that in the first all fields are singletons, and the second all are collections. Otherwise, their purpose is identical, so I collapsed them into one class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133018189
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ---
    @@ -185,44 +184,66 @@ private void assignAttemptState(ExecutionJobVertex executionJobVertex, List<Oper
     					subNonPartitionableState);
     
     				// PartitionedState
    -				reAssignSubPartitionableState(newManagedOperatorStates,
    +				reAssignSubPartitionableState(
    +					newManagedOperatorStates,
     					newRawOperatorStates,
     					subTaskIndex,
     					operatorIndex,
     					subManagedOperatorState,
     					subRawOperatorState);
     
     				// KeyedState
    -				if (operatorIndex == operatorIDs.size() - 1) {
    -					subKeyedState = reAssignSubKeyedStates(operatorState,
    +				if (isHeadOperator(operatorIndex, operatorIDs)) {
    --- End diff --
    
    Do we need this check here? From the JobManager and CheckpointCoordinator side, nothing should prevent non-head operators to have keyed state. It is just a limitation in the current API.
    
    This check seems to "enforce" an API limitation in a more general runtime that does not actually have a need for that restriction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129275823
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -208,13 +208,13 @@ public MetricGroup getMetricGroup() {
     	}
     
     	@Override
    -	public final void initializeState(OperatorStateHandles stateHandles) throws Exception {
    +	public final void initializeState(OperatorSubtaskState stateHandles) throws Exception {
     
     		Collection<KeyedStateHandle> keyedStateHandlesRaw = null;
     		Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
     		Collection<OperatorStateHandle> operatorStateHandlesBackend = null;
     
    -		boolean restoring = null != stateHandles;
    +		boolean restoring = (null != stateHandles);
    --- End diff --
    
    why did you add the braces?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4353
  
    CC @StephanEwen @zentol 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133021720
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---
    @@ -850,18 +843,20 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() {
     			OperatorID opID2 = OperatorID.fromJobVertexID(ackVertex2.getJobvertexId());
     			OperatorID opID3 = OperatorID.fromJobVertexID(ackVertex3.getJobvertexId());
     
    -			Map<OperatorID, OperatorState> operatorStates1 = pending1.getOperatorStates();
    +			TaskStateSnapshot taskOperatorSubtaskStates1_1 = spy(new TaskStateSnapshot());
    --- End diff --
    
    Is spying necessary here? There seem to be no `verify()` calls on this type...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133026125
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task
    + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can
    + * register their state under their operator id. Each operator instance is a physical execution responsible for
    + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize
    + * execution of logical operators, e.g. distributing a map function.
    + *
    + * <p>One instance of this class contains the information that one task will send to acknowledge a checkpoint request by
    + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all
    + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole
    + * state of a job at the time of the checkpoint.
    + *
    + * <p>This class should be called TaskState once the old class with this name that we keep for backwards
    + * compatibility goes away.
    + */
    +public class TaskStateSnapshot implements CompositeStateHandle {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	/** Mapping from an operator id to the state of one subtask of this operator */
    +	private final Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID;
    --- End diff --
    
    Hmm, I think if we consider default load factors and for large sizes, I would pick a min >30% hit rate linear array scan over 100% hit rate random access iteration. For all expected sizes (in cache) in this class, it should not matter. LHM also consumes a bit more memory. I would tend to keep it this way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133009796
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task
    + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can
    + * register their state under their operator id. Each operator instance is a physical execution responsible for
    + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize
    + * execution of logical operators, e.g. distributing a map function.
    + *
    + * <p>One instance of this class contains the information that one task will send to acknowledge a checkpoint request by
    + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all
    + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole
    + * state of a job at the time of the checkpoint.
    + *
    + * <p>This class should be called TaskState once the old class with this name that we keep for backwards
    + * compatibility goes away.
    + */
    +public class TaskStateSnapshot implements CompositeStateHandle {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	/** Mapping from an operator id to the state of one subtask of this operator */
    +	private final Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID;
    --- End diff --
    
    A `LinkedHashMap` has a slightly more predictable iteration performance (list traversal) compared to a `HashMap` (search through sparse table array). There are a lot of value iterations done in this class, but we also should have pretty full hash tables (since we never delete), so not sure how much difference it makes...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129274874
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---
    @@ -118,10 +118,22 @@ public void testSetState() {
     			PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
     			final long checkpointId = pending.getCheckpointId();
     
    -			SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null);
    -			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles));
    -			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles));
    -			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles));
    +			final TaskStateSnapshot subtaskStates = new TaskStateSnapshot();
    +
    +			subtaskStates.putSubtaskStateByOperatorID(
    +				OperatorID.fromJobVertexID(statefulId),
    +				new OperatorSubtaskState(
    +					serializedState.get(0),
    +					Collections.<OperatorStateHandle>emptyList(),
    +					Collections.<OperatorStateHandle>emptyList(),
    +					Collections.singletonList(serializedKeyGroupStates),
    +					Collections.<KeyedStateHandle>emptyList()));
    +
    +			//SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null);
    --- End diff --
    
    whats up with this line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4353
  
    Thanks for the review @StephanEwen ! As I have already some branches that build upon this and touch similar places in the code, I would suggest to merge this as is and introduce the polishing changes you suggested afterwards with another commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4353
  
    I had a very rough look at it, and the conceptual rework looks very good.
    
    This would need a detailed pass over the code changes, though, since it touches very sensitive code...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129278337
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---
    @@ -75,31 +103,84 @@
     	 */
     	private final long stateSize;
     
    +	@VisibleForTesting
    +	public OperatorSubtaskState(StreamStateHandle legacyOperatorState) {
    --- End diff --
    
    should this constructor call the other one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r127709120
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---
    @@ -18,20 +18,40 @@
     
     package org.apache.flink.runtime.checkpoint;
     
    +import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.runtime.state.CompositeStateHandle;
     import org.apache.flink.runtime.state.KeyedStateHandle;
     import org.apache.flink.runtime.state.OperatorStateHandle;
     import org.apache.flink.runtime.state.SharedStateRegistry;
     import org.apache.flink.runtime.state.StateObject;
     import org.apache.flink.runtime.state.StateUtil;
     import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Arrays;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
     
     /**
    - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}.
    + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical)
    + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all
    + * parallel tasks that physically execute parallelized, physical instances of the operator.
    + * <p>The full state of the logical operator is represented by {@link OperatorState} which consists of
    --- End diff --
    
    please add en empty line before the `<p>` tag so we have to make less changes when activating checkstyle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r127708906
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---
    @@ -18,20 +18,40 @@
     
     package org.apache.flink.runtime.checkpoint;
     
    +import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.runtime.state.CompositeStateHandle;
     import org.apache.flink.runtime.state.KeyedStateHandle;
     import org.apache.flink.runtime.state.OperatorStateHandle;
     import org.apache.flink.runtime.state.SharedStateRegistry;
     import org.apache.flink.runtime.state.StateObject;
     import org.apache.flink.runtime.state.StateUtil;
     import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Arrays;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
     
     /**
    - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}.
    + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical)
    + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all
    + * parallel tasks that physically execute parallelized, physical instances of the operator.
    + * <p>The full state of the logical operator is represented by {@link OperatorState} which consists of
    + * {@link OperatorSubtaskState}s.
    + * <p>Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle
    + * produced per state type (e.g. managed-keyed, raw-operator, ...). In particular, this holds when taking a snapshot.
    + * The purpose of having the state handles in collections is that this class is also reused in restoring state.
    + * Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In
    + * scale-down, one operator subtask can become responsible for the state of multiple previous subtasks. The collections
    + * can then store all the state handles that are relevant to build up the new subtask state.
    + * <p>There is no collection for legacy state because it is nor rescalable.
    --- End diff --
    
    typo: nor -> not


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129278519
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---
    @@ -18,20 +18,43 @@
     
     package org.apache.flink.runtime.checkpoint;
     
    +import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.runtime.state.CompositeStateHandle;
     import org.apache.flink.runtime.state.KeyedStateHandle;
     import org.apache.flink.runtime.state.OperatorStateHandle;
     import org.apache.flink.runtime.state.SharedStateRegistry;
     import org.apache.flink.runtime.state.StateObject;
     import org.apache.flink.runtime.state.StateUtil;
     import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Arrays;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
     
     /**
    - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}.
    + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical)
    + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all
    + * parallel tasks that physically execute parallelized, physical instances of the operator.
    + *
    + * <p>The full state of the logical operator is represented by {@link OperatorState} which consists of
    + * {@link OperatorSubtaskState}s.
    + *
    + * <p>Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle
    --- End diff --
    
    because there **is**


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129020863
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---
    @@ -18,20 +18,40 @@
     
     package org.apache.flink.runtime.checkpoint;
     
    +import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.runtime.state.CompositeStateHandle;
     import org.apache.flink.runtime.state.KeyedStateHandle;
     import org.apache.flink.runtime.state.OperatorStateHandle;
     import org.apache.flink.runtime.state.SharedStateRegistry;
     import org.apache.flink.runtime.state.StateObject;
     import org.apache.flink.runtime.state.StateUtil;
     import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Arrays;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
     
     /**
    - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}.
    + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical)
    + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all
    + * parallel tasks that physically execute parallelized, physical instances of the operator.
    + * <p>The full state of the logical operator is represented by {@link OperatorState} which consists of
    + * {@link OperatorSubtaskState}s.
    + * <p>Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle
    + * produced per state type (e.g. managed-keyed, raw-operator, ...). In particular, this holds when taking a snapshot.
    + * The purpose of having the state handles in collections is that this class is also reused in restoring state.
    + * Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In
    --- End diff --
    
    How come we don't need this in the current master, where this class is also used for restoring state?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129277176
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---
    @@ -867,81 +845,60 @@ public String toString() {
     
     		AsyncCheckpointRunnable(
     				StreamTask<?, ?> owner,
    -				List<StreamStateHandle> nonPartitionedStateHandles,
    -				List<OperatorSnapshotResult> snapshotInProgressList,
    +				Map<OperatorID, StreamStateHandle> nonPartitionedStateHandles,
    +				Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress,
     				CheckpointMetaData checkpointMetaData,
     				CheckpointMetrics checkpointMetrics,
     				long asyncStartNanos) {
     
     			this.owner = Preconditions.checkNotNull(owner);
    -			this.snapshotInProgressList = Preconditions.checkNotNull(snapshotInProgressList);
    +			this.operatorSnapshotsInProgress = Preconditions.checkNotNull(operatorSnapshotsInProgress);
     			this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
     			this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
     			this.nonPartitionedStateHandles = nonPartitionedStateHandles;
     			this.asyncStartNanos = asyncStartNanos;
    -
    -			if (!snapshotInProgressList.isEmpty()) {
    -				// TODO Currently only the head operator of a chain can have keyed state, so simply access it directly.
    -				int headIndex = snapshotInProgressList.size() - 1;
    -				OperatorSnapshotResult snapshotInProgress = snapshotInProgressList.get(headIndex);
    -				if (null != snapshotInProgress) {
    -					this.futureKeyedBackendStateHandles = snapshotInProgress.getKeyedStateManagedFuture();
    -					this.futureKeyedStreamStateHandles = snapshotInProgress.getKeyedStateRawFuture();
    -				}
    -			}
     		}
     
     		@Override
     		public void run() {
     			FileSystemSafetyNet.initializeSafetyNetForThread();
     			try {
    -				// Keyed state handle future, currently only one (the head) operator can have this
    -				KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
    -				KeyedStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
    -
    -				List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(snapshotInProgressList.size());
    -				List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size());
    -
    -				for (OperatorSnapshotResult snapshotInProgress : snapshotInProgressList) {
    -					if (null != snapshotInProgress) {
    -						operatorStatesBackend.add(
    -								FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
    -						operatorStatesStream.add(
    -								FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
    -					} else {
    -						operatorStatesBackend.add(null);
    -						operatorStatesStream.add(null);
    -					}
    -				}
    +				boolean hasState = false;
    +				final TaskStateSnapshot taskOperatorSubtaskStates =
    +					new TaskStateSnapshot(operatorSnapshotsInProgress.size());
     
    -				final long asyncEndNanos = System.nanoTime();
    -				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
    +				for (Map.Entry<OperatorID, OperatorSnapshotResult> entry : operatorSnapshotsInProgress.entrySet()) {
     
    -				checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
    +					OperatorID operatorID = entry.getKey();
    +					OperatorSnapshotResult snapshotInProgress = entry.getValue();
     
    -				ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState =
    -						new ChainedStateHandle<>(nonPartitionedStateHandles);
    +					OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(
    +						nonPartitionedStateHandles.get(operatorID),
    +						FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()),
    +						FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()),
    +						FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()),
    +						FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture())
    +					);
     
    -				ChainedStateHandle<OperatorStateHandle> chainedOperatorStateBackend =
    -						new ChainedStateHandle<>(operatorStatesBackend);
    +					hasState |= operatorSubtaskState.hasState();
    +					taskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
    +				}
     
    -				ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream =
    -						new ChainedStateHandle<>(operatorStatesStream);
    +				final long asyncEndNanos = System.nanoTime();
    +				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
     
    -				SubtaskState subtaskState = createSubtaskStateFromSnapshotStateHandles(
    -						chainedNonPartitionedOperatorsState,
    -						chainedOperatorStateBackend,
    -						chainedOperatorStateStream,
    -						keyedStateHandleBackend,
    -						keyedStateHandleStream);
    +				checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
     
     				if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
     						CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
     
    +					// we signal a stateless task by reporting null, so that there are no attempts to assign empty state
    +					// to stateless tasks on restore. This enables simple job modifications that only concern
    +					// stateless without the need to assign them uids to match their (always empty) states.
    --- End diff --
    
    stateless **tasks**


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133027325
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task
    + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can
    + * register their state under their operator id. Each operator instance is a physical execution responsible for
    + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize
    + * execution of logical operators, e.g. distributing a map function.
    + *
    + * <p>One instance of this class contains the information that one task will send to acknowledge a checkpoint request by
    + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all
    + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole
    + * state of a job at the time of the checkpoint.
    + *
    + * <p>This class should be called TaskState once the old class with this name that we keep for backwards
    + * compatibility goes away.
    + */
    +public class TaskStateSnapshot implements CompositeStateHandle {
    --- End diff --
    
    This class is totally intended to be immutable. So beyond what it is currently enforcing, do you suggest using immutable collections inside?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129272281
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task
    + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can
    + * register their state under their operator id. Each operator instance is a physical execution responsible for
    + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize
    + * execution of logical operators, e.g. distributing a map function.
    + * <p>One instance of this class contains the information that one task will send to acknowledge a checkpoint request by t
    --- End diff --
    
    add empty line before paragraph


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r130295736
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -208,13 +208,13 @@ public MetricGroup getMetricGroup() {
     	}
     
     	@Override
    -	public final void initializeState(OperatorStateHandles stateHandles) throws Exception {
    +	public final void initializeState(OperatorSubtaskState stateHandles) throws Exception {
     
     		Collection<KeyedStateHandle> keyedStateHandlesRaw = null;
     		Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
     		Collection<OperatorStateHandle> operatorStateHandlesBackend = null;
     
    -		boolean restoring = null != stateHandles;
    +		boolean restoring = (null != stateHandles);
    --- End diff --
    
    +1 to keep the parenthesis
    
    I think we should let contributors use such styles at their discretion


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133013315
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---
    @@ -164,8 +168,16 @@ public void acknowledgeCheckpoint(
     					throw new RuntimeException(e);
     				}
     
    +				boolean hasManagedKeyedState = false;
    +				for (Map.Entry<OperatorID, OperatorSubtaskState> entry : checkpointStateHandles.getSubtaskStateMappings()) {
    +					OperatorSubtaskState state = entry.getValue();
    +					if (state != null) {
    +						hasManagedKeyedState |= state.getManagedKeyedState() != null;
    +					}
    +				}
    +
     				// should be one k/v state
    --- End diff --
    
    "should be **at least** one k/v state"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133021771
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---
    @@ -878,14 +873,17 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() {
     			}
     			long checkpointId2 = pending2.getCheckpointId();
     
    -			Map<OperatorID, OperatorState> operatorStates2 = pending2.getOperatorStates();
    +			TaskStateSnapshot taskOperatorSubtaskStates2_1 = spy(new TaskStateSnapshot());
    --- End diff --
    
    Same as above, spying necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129532719
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -208,13 +208,13 @@ public MetricGroup getMetricGroup() {
     	}
     
     	@Override
    -	public final void initializeState(OperatorStateHandles stateHandles) throws Exception {
    +	public final void initializeState(OperatorSubtaskState stateHandles) throws Exception {
     
     		Collection<KeyedStateHandle> keyedStateHandlesRaw = null;
     		Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
     		Collection<OperatorStateHandle> operatorStateHandlesBackend = null;
     
    -		boolean restoring = null != stateHandles;
    +		boolean restoring = (null != stateHandles);
    --- End diff --
    
    I like to do this when generating a boolean out of a `!=` or `==` comparison because I find this easier to read in the presence of more than one `=` character.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4353
  
    BTW, one alternative I was once considering for the scale down case is merging state handles that are backed by different physical files in one logical state handle, using something based on `MultiStreamStateHandle`. That would require minor changes in how the backends currently iterate the handles and some calculation of virtual offsets near the `StateAssignmentOperation`, mapping the old physical file offsets to the new logical offsets in the stream that gives a consecutive logical view over the files. Then, the whole code would never again deal with this detail. Wonder if this is worth the effort?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4353
  
    Merged in b71154a734


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---