You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "stevenzwu (via GitHub)" <gi...@apache.org> on 2023/04/18 22:30:14 UTC

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7366: Flink: add event time ordered assigner for FLIP-27 source

stevenzwu commented on code in PR #7366:
URL: https://github.com/apache/iceberg/pull/7366#discussion_r1170385468


##########
flink/v1.16/build.gradle:
##########
@@ -42,6 +42,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
     compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${flinkVersion}"
     compileOnly "org.apache.flink:flink-connector-base:${flinkVersion}"
     compileOnly "org.apache.flink:flink-connector-files:${flinkVersion}"
+    compileOnly "org.apache.flink:flink-core${flinkVersion}"

Review Comment:
   ```suggestion
       compileOnly "org.apache.flink:flink-core:${flinkVersion}"
   ```
   
   also what do we need from `flink-core`? I have your old impl working in local branch without `flink-core` compileOnly dep.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/GlobalWatermarkTracker.java:
##########
@@ -0,0 +1,109 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import edu.umd.cs.findbugs.annotations.Nullable;

Review Comment:
   use `javax.annotation.Nullable`



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/GlobalWatermarkTracker.java:
##########
@@ -0,0 +1,109 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.Serializable;
+
+/**
+ * Global Watermark Tracker is used to track watermarks across a set of partitions in Flink job.
+ * Watermark in this context captures the per-partition timestamp.
+ *
+ * @param <PartitionType> parametric type of partition
+ */
+interface GlobalWatermarkTracker<PartitionType> {

Review Comment:
   how about call it `WatermarkAggregator`? 
   
   probably also rename `PartitionType` to sth else. Partition in the context of Iceberg could be misinterpreted. maybe just `G` as for `Group`?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssignerType.java:
##########
@@ -29,5 +29,10 @@ public SplitAssignerFactory factory() {
     }
   };
 
+  EVENTTIME {

Review Comment:
   should be `EVENT_TIME_ALIGNED`



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/WatermarkTracker.java:
##########
@@ -0,0 +1,44 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+public interface WatermarkTracker {

Review Comment:
   can we add some Javadoc for the class and methods?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/EventTimeAlignedAssigner.java:
##########
@@ -0,0 +1,301 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.calcite.shaded.com.google.common.collect.Iterables.getFirst;
+
+public class EventTimeAlignedAssigner implements SplitAssigner, WatermarkTracker.Listener {
+  private static final Logger log = LoggerFactory.getLogger(EventTimeAlignedAssigner.class);
+  private final WatermarkTracker watermarkTracker;
+  private final State state;
+  private final TimestampAssigner<IcebergSourceSplit> timestampAssigner;
+  private final FutureNotifier futureNotifier;
+  private final Options options;
+
+  // flag that informs the state that there are no more splits to be added by the Enumerator
+  private volatile boolean noMoreSplits;
+
+  public EventTimeAlignedAssigner(
+      WatermarkTracker watermarkTracker,
+      TimestampAssigner<IcebergSourceSplit> timestampAssigner,
+      Options options) {
+    this.watermarkTracker = watermarkTracker;
+    this.state = new State(watermarkTracker, timestampAssigner);
+    this.options = options;
+    this.timestampAssigner = timestampAssigner;
+    this.futureNotifier = new FutureNotifier();
+    this.watermarkTracker.onInitialization();
+  }
+
+  @Override
+  public void start() {
+    watermarkTracker.addListener(this);
+  }
+
+  @Override
+  public void close() {
+    watermarkTracker.removeListener(this);
+    notifyListener();
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    GetSplitResult result = getNextInternal();
+    if (result.status() == GetSplitResult.Status.AVAILABLE) {
+      state.onSplitAssigned(result.split());
+    }
+    return result;
+  }
+
+  @Override
+  public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
+    state.onDiscoveredSplits(splits);
+  }
+
+  @Override
+  public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
+    state.onUnassignedSplits(splits);
+  }
+
+  @Override
+  public void onCompletedSplits(Collection<String> completedSplitIds) {
+    state.onCompletedSplits(completedSplitIds);
+    if (!completedSplitIds.isEmpty() && isTerminal()) {
+      updateListenersOnTerminalCondition();
+    }
+  }
+
+  @Override
+  public void onNoMoreSplits() {
+    noMoreSplits = true;
+    if (isTerminal()) {
+      updateListenersOnTerminalCondition();
+    }
+  }
+
+  private boolean isWithinBounds(IcebergSourceSplit split, Long watermark) {
+    if (watermark == null) {
+      return true;
+    }
+
+    long splitTs = timestampAssigner.extractTimestamp(split, -1);
+    if (splitTs < watermark) {
+      log.warn("splitTs at {} is lower than the watermark {}", splitTs, watermark);
+    }
+
+    return Math.max(splitTs - watermark, 0L) <= options.getThresholdInMs();
+  }
+
+  private GetSplitResult getNextInternal() {
+    if (state.getUnassignedSplits().isEmpty()) {
+      log.info("looks like there are no splits to be assigned");
+      return GetSplitResult.unavailable();
+    }
+
+    try {
+      Long watermark = watermarkTracker.getGlobalWatermark();
+      IcebergSourceSplit pendingSplit = getFirst(state.getUnassignedSplits(), null);
+      if (!isWithinBounds(pendingSplit, watermark)) {
+        log.info(
+            "split {} is not within bounds {} {}",
+            pendingSplit,
+            watermark,
+            timestampAssigner.extractTimestamp(pendingSplit, -1));
+        return GetSplitResult.constrained();
+      }
+
+      return GetSplitResult.forSplit(pendingSplit);
+    } catch (Exception e) {
+      log.error("Couldn't obtain the watermark from the tracker", e);
+      return GetSplitResult.unavailable();
+    }
+  }
+
+  private void updateListenersOnTerminalCondition() {
+    state.onNoMoreStatusChanges();
+  }
+
+  @Override
+  public Collection<IcebergSourceSplitState> state() {
+    return
+        state
+            .getUnassignedSplits()
+            .stream()
+            .map(split -> new IcebergSourceSplitState(split, IcebergSourceSplitStatus.UNASSIGNED))
+            .collect(Collectors.toList());
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    CompletableFuture<Void> result = futureNotifier.future();
+    checkAndNotifyListener();
+    return result;
+  }
+
+  private boolean hasNext() {
+    return getNextInternal().status().equals(GetSplitResult.Status.AVAILABLE);
+  }
+
+  private void checkAndNotifyListener() {
+    if (hasNext()) {
+      log.info("Looks like the future can be completed with an assignment; completing the future");
+      notifyListener();
+    }
+  }
+
+  private void notifyListener() {
+    // Simply complete the future and return;
+    futureNotifier.notifyComplete();
+  }
+
+  @Override
+  public int pendingSplitCount() {
+    return state.getUnassignedSplits().size();
+  }
+
+  @Override
+  public void onWatermarkChange(Long watermark) {
+    notifyListener();
+  }
+
+  // checks if the assigner state has reached the terminal condition
+  private boolean isTerminal() {
+    // check if there are no more splits to be added by the system
+    // check if all the splits have been completed
+    return (noMoreSplits && state.getUnassignedSplits().isEmpty());
+  }
+
+  public static class Options implements Serializable {
+
+    private final long thresholdInMs;
+
+    public Options(Duration maxMisalignmentThreshold) {
+      this.thresholdInMs = maxMisalignmentThreshold.toMillis();
+    }
+
+    public long getThresholdInMs() {
+      return thresholdInMs;
+    }
+  }
+
+  /**
+   * Keeps track of unassigned splits ordered by certain comparator.
+   */
+  @NotThreadSafe
+  static
+  class State {

Review Comment:
   don't know if this really brings a lot value. tracker is used both outside and inside this nested class. we also need to duplicate the set of APIs. what about merge the functionalities into the outside class?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/WatermarkTracker.java:
##########
@@ -0,0 +1,44 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+public interface WatermarkTracker {

Review Comment:
   Do we need both WatermarkTracker and GlobalWatermarkTracker?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/AscendingTimestampSplitComparator.java:
##########
@@ -0,0 +1,57 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+/**
+ * Creates ordering for splits based on a timestamp assigner. If two splits have the same timestamp
+ * assigned, then the ordering is based on the split ID. Assumes split IDs to be unique across
+ * splits.
+ */
+class AscendingTimestampSplitComparator
+    implements Comparator<IcebergSourceSplit>, Serializable {
+
+  private final TimestampAssigner<IcebergSourceSplit> timestampAssigner;
+
+  public AscendingTimestampSplitComparator(TimestampAssigner<IcebergSourceSplit> timestampAssigner) {
+    this.timestampAssigner = timestampAssigner;
+  }
+
+  @Override
+  public int compare(IcebergSourceSplit a, IcebergSourceSplit b) {
+    if (a.splitId().equals(b.splitId())) {
+      return 0;
+    }
+    int t =

Review Comment:
   nit: Iceberg coding style as an empty line after a control block `}'



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/GlobalWatermarkTrackers.java:
##########
@@ -0,0 +1,49 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.annotation.VisibleForTesting;
+
+/**
+ * GlobalWatermarkTracker Factory that caches the issued trackers. The caching is required because
+ * we could have multiple Iceberg Sources (one for each kafka source) trying to create the
+ * WatermarkTracker independently in parallel. This implementation ensures that there's only one
+ * instance for a given partition type.
+ */
+public class GlobalWatermarkTrackers implements GlobalWatermarkTracker.Factory {
+
+  private static final Map<String, GlobalWatermarkTracker<?>> trackerCache = new HashMap<>();
+
+  @SuppressWarnings("unchecked")
+  public synchronized <PartitionType> GlobalWatermarkTracker<PartitionType> apply(String name) {

Review Comment:
   I am not quite following `PartitionType` concept. seems like partition is like a source if I understand it correctly
   
   Should we also introduce the `watermarkGroup` concept that Flink watermark alignment API provides.
   
   https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/api/common/eventtime/WatermarkAlignmentParams.html



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/EventTimeAlignedAssigner.java:
##########
@@ -0,0 +1,301 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.calcite.shaded.com.google.common.collect.Iterables.getFirst;
+
+public class EventTimeAlignedAssigner implements SplitAssigner, WatermarkTracker.Listener {

Review Comment:
   a lot of methods here probably need to be synchronized. with that, not sure if `FutureNotifier` is still needed.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/EventTimeAwareSplitAssignerFactory.java:
##########
@@ -0,0 +1,52 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+
+public class EventTimeAwareSplitAssignerFactory implements SplitAssignerFactory {
+  private final WatermarkTracker watermarkTracker;
+  private final TimestampAssigner<IcebergSourceSplit> timestampAssigner;
+
+  private final EventTimeAlignedAssigner.Options options;
+
+  public EventTimeAwareSplitAssignerFactory(
+      WatermarkTracker watermarkTracker,
+      TimestampAssigner<IcebergSourceSplit> timestampAssigner, EventTimeAlignedAssigner.Options options) {

Review Comment:
   `Options` are used in public API. not sure if it should be a nested class inside `EventTimeAlignedAssigner`. also we should add the config to the common class `FlinkReadOptions` and `FlinkReadConf`



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/GlobalWatermarkTracker.java:
##########
@@ -0,0 +1,109 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.Serializable;
+
+/**
+ * Global Watermark Tracker is used to track watermarks across a set of partitions in Flink job.
+ * Watermark in this context captures the per-partition timestamp.
+ *
+ * @param <PartitionType> parametric type of partition
+ */
+interface GlobalWatermarkTracker<PartitionType> {
+
+  /**
+   * If none of the partitions have registered, then this returns null.
+   *
+   * @return global watermark of all registered partitions
+   * @throws Exception if there are issues talking to the tracker.
+   */
+  @Nullable
+  Long getGlobalWatermark() throws Exception;
+
+  // updates the local watermark for a given partition
+  Long updateWatermarkForPartition(PartitionType partition, long watermark) throws Exception;

Review Comment:
   nit: just call it `update` or `aggregate`?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/GlobalWatermarkTracker.java:
##########
@@ -0,0 +1,109 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.Serializable;
+
+/**
+ * Global Watermark Tracker is used to track watermarks across a set of partitions in Flink job.
+ * Watermark in this context captures the per-partition timestamp.
+ *
+ * @param <PartitionType> parametric type of partition
+ */
+interface GlobalWatermarkTracker<PartitionType> {

Review Comment:
   wondering if we need an interface here. should we just rename `InMemoryGlobalWatermarkTracker` as `WatermarkAggregator`? do we envision any other aggregator implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org