You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/28 11:56:10 UTC
[flink] 01/02: [FLINK-17988][checkpointing] Discard only unique
channel state delegates
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b70ce9d1831877578374e357d19e1345c68646f9
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 28 00:09:36 2020 +0200
[FLINK-17988][checkpointing] Discard only unique channel state delegates
The underlying state handles of channel state handles can be the same.
Discard should only iterate over unique underlying handles.
---
.../runtime/checkpoint/OperatorSubtaskState.java | 4 +-
.../runtime/state/AbstractChannelStateHandle.java | 13 ++++
.../CheckpointCoordinatorFailureTest.java | 12 ++--
.../checkpoint/OperatorSubtaskStateTest.java | 74 ++++++++++++++++++++++
4 files changed, 97 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 4099ec1..c6cb69f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull;
+import static org.apache.flink.runtime.state.AbstractChannelStateHandle.collectUniqueDelegates;
/**
* This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical)
@@ -231,8 +232,7 @@ public class OperatorSubtaskState implements CompositeStateHandle {
toDispose.addAll(rawOperatorState);
toDispose.addAll(managedKeyedState);
toDispose.addAll(rawKeyedState);
- toDispose.addAll(inputChannelState);
- toDispose.addAll(resultSubpartitionState);
+ toDispose.addAll(collectUniqueDelegates(inputChannelState, resultSubpartitionState));
StateUtil.bestEffortDiscardAllStateObjects(toDispose);
} catch (Exception e) {
LOG.warn("Error while discarding operator states.", e);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
index c359d9d..98132b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.state;
import org.apache.flink.annotation.Internal;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -50,6 +53,16 @@ public abstract class AbstractChannelStateHandle<Info> implements StateObject {
this.size = size;
}
+ public static Set<StreamStateHandle> collectUniqueDelegates(Collection<? extends AbstractChannelStateHandle<?>>... collections) {
+ Set<StreamStateHandle> result = new HashSet<>();
+ for (Collection<? extends AbstractChannelStateHandle<?>> collection : collections) {
+ for (AbstractChannelStateHandle<?> handle : collection) {
+ result.add(handle.getDelegate());
+ }
+ }
+ return result;
+ }
+
@Override
public void discardState() throws Exception {
delegate.discardState();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index da1fe16..3514f5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -31,10 +33,12 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -90,8 +94,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class);
OperatorStateHandle managedOpHandle = mock(OperatorStreamStateHandle.class);
OperatorStateHandle rawOpHandle = mock(OperatorStreamStateHandle.class);
- InputChannelStateHandle inputChannelStateHandle = mock(InputChannelStateHandle.class);
- ResultSubpartitionStateHandle resultSubpartitionStateHandle = mock(ResultSubpartitionStateHandle.class);
+ InputChannelStateHandle inputChannelStateHandle = new InputChannelStateHandle(new InputChannelInfo(0, 1), mock(StreamStateHandle.class), Collections.singletonList(1L));
+ ResultSubpartitionStateHandle resultSubpartitionStateHandle = new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, 1), mock(StreamStateHandle.class), Collections.singletonList(1L));
final OperatorSubtaskState operatorSubtaskState = spy(new OperatorSubtaskState(
managedOpHandle,
@@ -125,8 +129,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
verify(operatorSubtaskState.getRawOperatorState().iterator().next()).discardState();
verify(operatorSubtaskState.getManagedKeyedState().iterator().next()).discardState();
verify(operatorSubtaskState.getRawKeyedState().iterator().next()).discardState();
- verify(operatorSubtaskState.getInputChannelState().iterator().next()).discardState();
- verify(operatorSubtaskState.getResultSubpartitionState().iterator().next()).discardState();
+ verify(operatorSubtaskState.getInputChannelState().iterator().next().getDelegate()).discardState();
+ verify(operatorSubtaskState.getResultSubpartitionState().iterator().next().getDelegate()).discardState();
}
private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
new file mode 100644
index 0000000..55360d5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link OperatorSubtaskState} test.
+ */
+public class OperatorSubtaskStateTest {
+ @Test
+ public void testDiscardDuplicatedDelegatesOnce() {
+ StreamStateHandle delegate = new DiscardOnceStreamStateHandle();
+ new OperatorSubtaskState(
+ StateObjectCollection.empty(),
+ StateObjectCollection.empty(),
+ StateObjectCollection.empty(),
+ StateObjectCollection.empty(),
+ new StateObjectCollection<>(asList(buildInputChannelHandle(delegate, 1), buildInputChannelHandle(delegate, 2))),
+ new StateObjectCollection<>(asList(buildSubpartitionHandle(delegate, 4), buildSubpartitionHandle(delegate, 3)))
+ ).discardState();
+ }
+
+ private ResultSubpartitionStateHandle buildSubpartitionHandle(StreamStateHandle delegate, int subPartitionIdx1) {
+ return new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, subPartitionIdx1), delegate, singletonList(0L));
+ }
+
+ private InputChannelStateHandle buildInputChannelHandle(StreamStateHandle delegate, int inputChannelIdx) {
+ return new InputChannelStateHandle(new InputChannelInfo(0, inputChannelIdx), delegate, singletonList(0L));
+ }
+
+ private static class DiscardOnceStreamStateHandle extends ByteStreamStateHandle {
+ private static final long serialVersionUID = 1L;
+
+ private boolean discarded = false;
+
+ DiscardOnceStreamStateHandle() {
+ super("test", new byte[0]);
+ }
+
+ @Override
+ public void discardState() {
+ super.discardState();
+ assertFalse("state was discarded twice", discarded);
+ discarded = true;
+ }
+ }
+}