You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/28 11:56:10 UTC

[flink] 01/02: [FLINK-17988][checkpointing] Discard only unique channel state delegates

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b70ce9d1831877578374e357d19e1345c68646f9
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 28 00:09:36 2020 +0200

    [FLINK-17988][checkpointing] Discard only unique channel state delegates
    
    The underlying state handles of channel state handles can be the same.
    Discard should only iterate over unique underlying handles.
---
 .../runtime/checkpoint/OperatorSubtaskState.java   |  4 +-
 .../runtime/state/AbstractChannelStateHandle.java  | 13 ++++
 .../CheckpointCoordinatorFailureTest.java          | 12 ++--
 .../checkpoint/OperatorSubtaskStateTest.java       | 74 ++++++++++++++++++++++
 4 files changed, 97 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 4099ec1..c6cb69f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull;
+import static org.apache.flink.runtime.state.AbstractChannelStateHandle.collectUniqueDelegates;
 
 /**
  * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical)
@@ -231,8 +232,7 @@ public class OperatorSubtaskState implements CompositeStateHandle {
 			toDispose.addAll(rawOperatorState);
 			toDispose.addAll(managedKeyedState);
 			toDispose.addAll(rawKeyedState);
-			toDispose.addAll(inputChannelState);
-			toDispose.addAll(resultSubpartitionState);
+			toDispose.addAll(collectUniqueDelegates(inputChannelState, resultSubpartitionState));
 			StateUtil.bestEffortDiscardAllStateObjects(toDispose);
 		} catch (Exception e) {
 			LOG.warn("Error while discarding operator states.", e);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
index c359d9d..98132b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.annotation.Internal;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -50,6 +53,16 @@ public abstract class AbstractChannelStateHandle<Info> implements StateObject {
 		this.size = size;
 	}
 
+	public static Set<StreamStateHandle> collectUniqueDelegates(Collection<? extends AbstractChannelStateHandle<?>>... collections) {
+		Set<StreamStateHandle> result = new HashSet<>();
+		for (Collection<? extends AbstractChannelStateHandle<?>> collection : collections) {
+			for (AbstractChannelStateHandle<?> handle : collection) {
+				result.add(handle.getDelegate());
+			}
+		}
+		return result;
+	}
+
 	@Override
 	public void discardState() throws Exception {
 		delegate.discardState();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index da1fe16..3514f5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -31,10 +33,12 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -90,8 +94,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class);
 		OperatorStateHandle managedOpHandle = mock(OperatorStreamStateHandle.class);
 		OperatorStateHandle rawOpHandle = mock(OperatorStreamStateHandle.class);
-		InputChannelStateHandle inputChannelStateHandle = mock(InputChannelStateHandle.class);
-		ResultSubpartitionStateHandle resultSubpartitionStateHandle = mock(ResultSubpartitionStateHandle.class);
+		InputChannelStateHandle inputChannelStateHandle = new InputChannelStateHandle(new InputChannelInfo(0, 1), mock(StreamStateHandle.class), Collections.singletonList(1L));
+		ResultSubpartitionStateHandle resultSubpartitionStateHandle = new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, 1), mock(StreamStateHandle.class), Collections.singletonList(1L));
 
 		final OperatorSubtaskState operatorSubtaskState = spy(new OperatorSubtaskState(
 			managedOpHandle,
@@ -125,8 +129,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		verify(operatorSubtaskState.getRawOperatorState().iterator().next()).discardState();
 		verify(operatorSubtaskState.getManagedKeyedState().iterator().next()).discardState();
 		verify(operatorSubtaskState.getRawKeyedState().iterator().next()).discardState();
-		verify(operatorSubtaskState.getInputChannelState().iterator().next()).discardState();
-		verify(operatorSubtaskState.getResultSubpartitionState().iterator().next()).discardState();
+		verify(operatorSubtaskState.getInputChannelState().iterator().next().getDelegate()).discardState();
+		verify(operatorSubtaskState.getResultSubpartitionState().iterator().next().getDelegate()).discardState();
 	}
 
 	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
new file mode 100644
index 0000000..55360d5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link OperatorSubtaskState} test.
+ */
+public class OperatorSubtaskStateTest {
+	@Test
+	public void testDiscardDuplicatedDelegatesOnce() {
+		StreamStateHandle delegate = new DiscardOnceStreamStateHandle();
+		new OperatorSubtaskState(
+			StateObjectCollection.empty(),
+			StateObjectCollection.empty(),
+			StateObjectCollection.empty(),
+			StateObjectCollection.empty(),
+			new StateObjectCollection<>(asList(buildInputChannelHandle(delegate, 1), buildInputChannelHandle(delegate, 2))),
+			new StateObjectCollection<>(asList(buildSubpartitionHandle(delegate, 4), buildSubpartitionHandle(delegate, 3)))
+		).discardState();
+	}
+
+	private ResultSubpartitionStateHandle buildSubpartitionHandle(StreamStateHandle delegate, int subPartitionIdx1) {
+		return new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, subPartitionIdx1), delegate, singletonList(0L));
+	}
+
+	private InputChannelStateHandle buildInputChannelHandle(StreamStateHandle delegate, int inputChannelIdx) {
+		return new InputChannelStateHandle(new InputChannelInfo(0, inputChannelIdx), delegate, singletonList(0L));
+	}
+
+	private static class DiscardOnceStreamStateHandle extends ByteStreamStateHandle {
+		private static final long serialVersionUID = 1L;
+
+		private boolean discarded = false;
+
+		DiscardOnceStreamStateHandle() {
+			super("test", new byte[0]);
+		}
+
+		@Override
+		public void discardState() {
+			super.discardState();
+			assertFalse("state was discarded twice", discarded);
+			discarded = true;
+		}
+	}
+}