You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/02 15:17:04 UTC

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1038246186


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   > We might have a big file where the read takes a while (or short checkpointing period), and we face an error at the end of the file
   
   This is orthogonal to the split discovery throttling feature in this PR. It is about processing semantics / duplicates upon failure recovery. Flink can achieve exactly-once processing semantics regarding state consistency. Whether it is e2e exactly-once semantics depends on sink (transactional or not). For Iceberg sink (with transactional commit), recovery won't cause duplicates in the sink Iceberg table.
   
   > here is a pathological scenario. Job was in a restart loop with successful checkpoints (like 1 or 2) btw restart. This would essentially bypass the throttling check. Potentially the tracked splits can keep growing. That is the small concern I has regarding not checkpointing. The concern is small, because this scenario is very unusual. Hence, I didn't implement the checkpointing of enumeration history (for throttling purpose).
   
   Whether we checkpoint the enumeration history or not really only affects the throttling feature in this PR. I can only think of the above pathological scenario where not checkpointing can be a potential problem. But again, it is a unusual scenario but it could happen in theory.
   
   We can add the checkpointing here without breaking the compatibility as the `IcebergEnumeratorStateSerializer` is versioned. We can just bump up the version and handle both the old and new versions.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org