You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/26 18:09:53 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #13735: [FLINK-19533][checkpoint] Add channel state reassignment for unaligned checkpoints.

pnowojski commented on a change in pull request #13735:
URL: https://github.com/apache/flink/pull/13735#discussion_r512158143



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelRescaler.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/**
+ * The {@link ChannelRescaler} narrows down the channels that need to be read during rescaling to recover from a
+ * particular channel when in-flight data has been stored in the checkpoint.
+ */
+@Internal
+public interface ChannelRescaler extends Serializable {
+	/**
+	 * Returns all old channel indexes that need to be read to restore all buffers for the given new channel index on
+	 * rescale.
+	 */
+	BitSet rescaleIntersections(int newChannelIndex, int oldNumberOfChannels, int newNumberOfChannels);

Review comment:
       What is the relation between this method/interface and `#rescaleIntersections` from the `Partitioner`? Why one returns ` int[]` and the other `BitSet`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
##########
@@ -63,31 +61,25 @@
 	/**
 	 * Snapshot from the {@link org.apache.flink.runtime.state.OperatorStateBackend}.
 	 */
-	@Nonnull

Review comment:
       Removal of `@Nonnull` is controversial by some. Some people prefer it as cheap, debug mode only assertion. This is because as far as I know, `@Nonnull` in debug mode, is adding `checkNonNull(...)` equivalents.
   
   I personally would be fine by not using them, and I personally I'm not adding them in a new code, but because of the above reason, I would be actually against removing them from a code that someone added (otherwise, we can end up in a ping pong situation when you are removing them and someone else re-adding).

##########
File path: flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
##########
@@ -37,4 +40,20 @@
 	 * @return The partition index.
 	 */
 	int partition(K key, int numPartitions);
+
+	/**
+	 * Returns all partitions that need to be read to restore the given new partition. The partitioner is then
+	 * applied on the key of the restored record to filter all irrelevant records.
+	 *
+	 * <p>In particular, to create a partition X after rescaling, all partitions returned by this method are fully read
+	 * and the key of each record is then fed into {@link #partition(Object, int)} to check if it belongs to X.
+	 *
+	 * <p>The default implementation states that all partitions need to be scanned and should be overwritten to improve
+	 * performance.
+	 */
+	@PublicEvolving
+	default int[] rescaleIntersections(int newPartition, int oldNumPartitions, int newNumPartitions) {
+		// any old partition may contain a record that should be in the new partition after rescaling
+		return IntStream.range(0, oldNumPartitions).toArray();
+	}

Review comment:
       We need this to support `CustomPartitionerWrapper`, right? This is `@Public` interface :( I think we need to think twice before we commit ourselves to such change. Let's sync offline again whether this is really the best/the only way of solving our problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org