You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "yegangy0718 (via GitHub)" <gi...@apache.org> on 2023/04/19 16:46:49 UTC

[GitHub] [iceberg] yegangy0718 commented on a diff in pull request #7360: Flink: Implement data statistics coordinator to aggregate data statistics from operator subtasks

yegangy0718 commented on code in PR #7360:
URL: https://github.com/apache/iceberg/pull/7360#discussion_r1170639078


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+class AggregateDataStatistics<K> implements Serializable {

Review Comment:
   Make sense to me. The reader may think `AggregateDataStatistics` is another implement of `DataStatistics`. I will rename the class name to `GlobalStatisticsAccumulator` and add javadoc. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+class AggregateDataStatistics<K> implements Serializable {
+
+  private final long checkpointId;
+  private final DataStatistics<K> dataStatistics;
+  private final Set<Integer> subtaskSet = Sets.newHashSet();
+
+  AggregateDataStatistics(long checkpoint, final DataStatisticsFactory<K> statisticsFactory) {
+    this.checkpointId = checkpoint;
+    this.dataStatistics = statisticsFactory.createDataStatistics();
+  }
+
+  long checkpointId() {
+    return checkpointId;
+  }
+
+  DataStatistics<K> dataStatistics() {
+    return dataStatistics;
+  }
+
+  void mergeDataStatistic(int subtask, DataStatisticsEvent<K> event) {
+    Preconditions.checkArgument(

Review Comment:
   `mergeDataStatistic` is called by `handleDataStatisticRequest`,  and `handleDataStatisticRequest` is called by `handleEventFromOperator` in the coordinator thread.
   If the `checkArgument` fails, it will throw IllegalArgumentException and `runInCoordinatorThread` method will catch it  and call context.failjob
   ```
     private void runInCoordinatorThread(
         ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
       ensureStarted();
       coordinatorExecutor.execute(
           () -> {
             try {
               action.run();
             } catch (Throwable t) {
               // if we have a JVM critical error, promote it immediately, there is a good
               // chance the logging or job failing will not succeed anymore
               ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
   
               String actionString = String.format(actionName, actionNameFormatParameters);
               LOG.error(
                   "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
                   operatorName,
                   actionString,
                   t);
               context.failJob(t);
             }
           });
   ```
   And note that `handleEventFromOperator` is not called at checkpoint, so coordinator cannot abort any checkpoint. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's more than the expected percentage {}. Thus sending the aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's less than the expected percentage {}. Thus dropping the incomplete aggregate data statistics {} and starting collecting data statistics from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+    }
+
+    if (incompleteAggregateDataStatistics.aggregateSize() == context.currentParallelism()) {
+      completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+      LOG.info(
+          "Received data statistics from all {} operators for checkpoint {}. Sending the aggregated data statistics {} to subtasks.",
+          context.currentParallelism(),
+          incompleteAggregateDataStatistics.checkpointId(),
+          completeAggregateDataStatistics);
+      incompleteAggregateDataStatistics = null;
+      context.sendDataStatisticsToSubtasks(
+          checkpointId, completeAggregateDataStatistics.dataStatistics());
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          if (event instanceof DataStatisticsEvent) {
+            handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) event));
+          } else {

Review Comment:
   will update to 
   ```
   Preconditions.checkArgument(event instanceof DataStatisticsEvent);
             handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) event));
   ```



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+class AggregateDataStatistics<K> implements Serializable {
+
+  private final long checkpointId;
+  private final DataStatistics<K> dataStatistics;
+  private final Set<Integer> subtaskSet = Sets.newHashSet();
+
+  AggregateDataStatistics(long checkpoint, final DataStatisticsFactory<K> statisticsFactory) {
+    this.checkpointId = checkpoint;
+    this.dataStatistics = statisticsFactory.createDataStatistics();
+  }
+
+  long checkpointId() {
+    return checkpointId;
+  }
+
+  DataStatistics<K> dataStatistics() {
+    return dataStatistics;
+  }
+
+  void mergeDataStatistic(int subtask, DataStatisticsEvent<K> event) {
+    Preconditions.checkArgument(
+        checkpointId == event.checkpointId(),
+        "Received unexpected event from checkpoint %s. Expected checkpoint %s",
+        event.checkpointId(),
+        checkpointId);
+    if (!subtaskSet.add(subtask)) {
+      return;

Review Comment:
   Sounds make. I will add 
   ```
         LOG.debug("Receive duplicated data statistics for checkpoint {} subtask {}. Ignore it.", checkpointId, subtask);
   
   ```



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's more than the expected percentage {}. Thus sending the aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());

Review Comment:
   <img width="715" alt="Screenshot 2023-04-18 at 5 00 48 PM" src="https://user-images.githubusercontent.com/8229749/232930010-a8f74c25-240e-4ab8-bbd6-4431653ab79e.png">
   I use function `dataStatistics` in unit test to verify GlobalStatisticsAccumulator merged data statistics result as well. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));

Review Comment:
   will remove globalDataStatistics from log message



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));
+  }
+
+  int currentParallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  void attemptReady(OperatorCoordinator.SubtaskGateway gateway) {
+    Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.registerSubtaskGateway(gateway);
+  }
+
+  void attemptFailed(int subtaskIndex, int attemptNumber) {
+    Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.unregisterSubtaskGateway(subtaskIndex, attemptNumber);
+  }
+
+  void subtaskReset(int subtaskIndex) {
+    Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.reset(subtaskIndex);
+  }
+
+  void failJob(Throwable cause) {
+    operatorCoordinatorContext.failJob(cause);
+  }
+
+  /**
+   * A helper method that delegates the callable to the coordinator thread if the current thread is
+   * not the coordinator thread, otherwise call the callable right away.
+   *
+   * @param callable the callable to delegate.
+   */
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      this.gateways = new Map[parallelism];
+
+      for (int i = 0; i < parallelism; ++i) {
+        this.gateways[i] = new HashMap<>();
+      }
+    }
+
+    private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway gateway) {

Review Comment:
   Flink side doesn't log it. My only concern is, when there are lots of subtasks, debug log will generate too many logs one this. But it doesn't hurt. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(

Review Comment:
   Do you mean rename `runInCoordinatorThread` to `runInCoordinatorTasks` ?
    The executor is defined by 
   ```
   ExecutorService coordinatorExecutor =
           Executors.newSingleThreadExecutor(coordinatorThreadFactory);
   ```
   And when implementing this class, I actually take https://github.com/apache/flink/blob/423cdcb99c3f66be435ad2e70d6a15f10a69e252/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java#L183. as a reference. 
   `runInCoordinatorThread` is more proper I think. WDYT?
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -138,6 +137,14 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
     localStatistics = statisticsFactory.createDataStatistics();
   }
 
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    super.notifyCheckpointComplete(checkpointId);
+    // Send global statistics to partitioners at checkpoint to update data distribution at the same

Review Comment:
   it's the comment for the line 145



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);

Review Comment:
   will remove `final` 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();

Review Comment:
   Indeed, in the `start` method, there is not much to do. The executor doesn't need to call any function to start. Whenever this is any job/task, submit it to the executor, that's it.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's more than the expected percentage {}. Thus sending the aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's less than the expected percentage {}. Thus dropping the incomplete aggregate data statistics {} and starting collecting data statistics from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+    }
+
+    if (incompleteAggregateDataStatistics.aggregateSize() == context.currentParallelism()) {
+      completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+      LOG.info(
+          "Received data statistics from all {} operators for checkpoint {}. Sending the aggregated data statistics {} to subtasks.",
+          context.currentParallelism(),
+          incompleteAggregateDataStatistics.checkpointId(),
+          completeAggregateDataStatistics);
+      incompleteAggregateDataStatistics = null;
+      context.sendDataStatisticsToSubtasks(
+          checkpointId, completeAggregateDataStatistics.dataStatistics());
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          if (event instanceof DataStatisticsEvent) {
+            handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) event));
+          } else {
+            throw new FlinkException("Unrecognized data statistics operator event: " + event);
+          }
+        },
+        "handling operator event %s from data statistics operator subtask %d (#%d)",
+        event,

Review Comment:
   This is the error message which will only be logged when an exception happens. 
   I will update it to only log event class. 
   
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {

Review Comment:
   SGTM. Will rename it. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / context.currentParallelism()

Review Comment:
   The challenge to put the logic into `GlobalStatisticsAccumulator` is, based on different conditions, the operations on inProgressAggregation and lastCompleteAggregation are different. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+class AggregateDataStatistics<K> implements Serializable {
+
+  private final long checkpointId;
+  private final DataStatistics<K> dataStatistics;
+  private final Set<Integer> subtaskSet = Sets.newHashSet();
+
+  AggregateDataStatistics(long checkpoint, final DataStatisticsFactory<K> statisticsFactory) {
+    this.checkpointId = checkpoint;
+    this.dataStatistics = statisticsFactory.createDataStatistics();
+  }
+
+  long checkpointId() {
+    return checkpointId;
+  }
+
+  DataStatistics<K> dataStatistics() {
+    return dataStatistics;
+  }
+
+  void mergeDataStatistic(int subtask, DataStatisticsEvent<K> event) {
+    Preconditions.checkArgument(
+        checkpointId == event.checkpointId(),
+        "Received unexpected event from checkpoint %s. Expected checkpoint %s",
+        event.checkpointId(),
+        checkpointId);
+    if (!subtaskSet.add(subtask)) {
+      return;
+    }
+
+    dataStatistics.merge(event.dataStatistics());
+  }
+
+  long aggregateSize() {

Review Comment:
   OK. I will rename the function to `accumulatedSubtasksCount`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's more than the expected percentage {}. Thus sending the aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's less than the expected percentage {}. Thus dropping the incomplete aggregate data statistics {} and starting collecting data statistics from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+    }
+
+    if (incompleteAggregateDataStatistics.aggregateSize() == context.currentParallelism()) {
+      completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+      LOG.info(
+          "Received data statistics from all {} operators for checkpoint {}. Sending the aggregated data statistics {} to subtasks.",
+          context.currentParallelism(),
+          incompleteAggregateDataStatistics.checkpointId(),
+          completeAggregateDataStatistics);
+      incompleteAggregateDataStatistics = null;
+      context.sendDataStatisticsToSubtasks(
+          checkpointId, completeAggregateDataStatistics.dataStatistics());
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          if (event instanceof DataStatisticsEvent) {
+            handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) event));
+          } else {
+            throw new FlinkException("Unrecognized data statistics operator event: " + event);
+          }
+        },
+        "handling operator event %s from data statistics operator subtask %d (#%d)",
+        event,
+        subtask,
+        attemptNumber);
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for checkpoint {}",
+              operatorName,
+              checkpointId);
+          try {
+            byte[] serializedDataDistributionWeight =

Review Comment:
   we need to checkpoint both statistics and checkpoint id (subtask set is not needed)
   since in #handleDataStatisticRequest, we checked lastCompleteAggregation checkpointId with the event checkpoint id 
   ```
       if (lastCompleteAggregation != null
           && lastCompleteAggregation.checkpointId() >= checkpointId) {
         LOG.debug(
             "Data statistics aggregation for checkpoint {} has completed. Ignore the event from subtask {} for checkpoint {}",
             lastCompleteAggregation.checkpointId(),
             subtask,
             checkpointId);
         return;
       }
   ```



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null

Review Comment:
   SGTM. Will rename it. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();

Review Comment:
   ExecutorService#shutdown(Initiates an orderly shutdown and but no new tasks will be accepted) and ExecutorService#shutdownNow(Attempts to stop all actively executing tasks) are used for different purposes. 
   In DataStatisticsCoordinator, it calls `context.close()` first which uses `ExecutorService#shutdown` , and then call `coordinatorExecutor.shutdownNow()` second to shut down all. 
   
   ```
     @Override
     public void close() throws Exception {
       LOG.info("Closing data statistics coordinator for {}.", operatorName);
       try {
         if (started) {
           context.close();
         }
       } finally {
         coordinatorExecutor.shutdownNow();
         // We do not expect this to actually block for long. At this point, there should
         // be very few task running in the executor, if any.
         coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
       }
     }
   ``` 
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));
+  }
+
+  int currentParallelism() {

Review Comment:
   will rename it to parallelism



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);

Review Comment:
   Double checked how SourceCoordiantorContext handles that. 
   It calls CoordinatryThread for every subtask 
   
   ```
       void sendEventToSourceOperator(int subtaskId, OperatorEvent event) {
           checkSubtaskIndex(subtaskId);
   
           callInCoordinatorThread(
                   () -> {
                       final OperatorCoordinator.SubtaskGateway gateway =
                               subtaskGateways.getOnlyGatewayAndCheckReady(subtaskId);
                       gateway.sendEvent(event);
                       return null;
                   },
                   String.format("Failed to send event %s to subtask %d", event, subtaskId));
       }
   ```
   For the two options,  execute the loop for all subtasks in coordinator thread vs define loop outside and then call coordiantor thread for every subtask, I feel that there is no mush difference. Since  callInCoordinatorThread is sync call, any one fails/throws exception, the remaining one won't be executed. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's more than the expected percentage {}. Thus sending the aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's less than the expected percentage {}. Thus dropping the incomplete aggregate data statistics {} and starting collecting data statistics from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);

Review Comment:
   yes, we can do that. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;

Review Comment:
   I googled percentage vs ratio. Below is the result
   ```
   A ratio is a comparison of two similar quantities. Given any two similar quantities a and b, the ratio of a to b that is a:b is defined as a:b = a/b, where b≠0. Percentage means 'by the hundred' or 'divide by one hundred'. The percentage is also used to compare quantities, which means 'per 100'.
   ```
   In our case, we want to make sure we receive a good amount of subtasks data statistics, so PERCENTAGE is more proper I think. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));
+  }
+
+  int currentParallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  void attemptReady(OperatorCoordinator.SubtaskGateway gateway) {
+    Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.registerSubtaskGateway(gateway);
+  }
+
+  void attemptFailed(int subtaskIndex, int attemptNumber) {
+    Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.unregisterSubtaskGateway(subtaskIndex, attemptNumber);
+  }
+
+  void subtaskReset(int subtaskIndex) {
+    Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.reset(subtaskIndex);
+  }
+
+  void failJob(Throwable cause) {
+    operatorCoordinatorContext.failJob(cause);
+  }
+
+  /**
+   * A helper method that delegates the callable to the coordinator thread if the current thread is
+   * not the coordinator thread, otherwise call the callable right away.
+   *
+   * @param callable the callable to delegate.
+   */
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {

Review Comment:
   same reply as https://github.com/apache/iceberg/pull/7360/files#r1170076511



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In the end a custom
+ * partitioner will distribute traffic based on the global data statistics to improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's more than the expected percentage {}. Thus sending the aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for checkpoint {}. It's less than the expected percentage {}. Thus dropping the incomplete aggregate data statistics {} and starting collecting data statistics from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+    }
+
+    if (incompleteAggregateDataStatistics.aggregateSize() == context.currentParallelism()) {
+      completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+      LOG.info(
+          "Received data statistics from all {} operators for checkpoint {}. Sending the aggregated data statistics {} to subtasks.",
+          context.currentParallelism(),
+          incompleteAggregateDataStatistics.checkpointId(),
+          completeAggregateDataStatistics);
+      incompleteAggregateDataStatistics = null;
+      context.sendDataStatisticsToSubtasks(
+          checkpointId, completeAggregateDataStatistics.dataStatistics());
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          if (event instanceof DataStatisticsEvent) {
+            handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) event));
+          } else {
+            throw new FlinkException("Unrecognized data statistics operator event: " + event);
+          }
+        },
+        "handling operator event %s from data statistics operator subtask %d (#%d)",
+        event,
+        subtask,
+        attemptNumber);
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for checkpoint {}",
+              operatorName,
+              checkpointId);
+          try {
+            byte[] serializedDataDistributionWeight =
+                InstantiationUtil.serializeObject(completeAggregateDataStatistics);
+            resultFuture.complete(serializedDataDistributionWeight);
+          } catch (Throwable e) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
+            resultFuture.completeExceptionally(
+                new CompletionException(
+                    String.format("Failed to checkpoint data statistics for %s", operatorName), e));
+          }
+        },
+        "taking checkpoint %d",
+        checkpointId);
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void notifyCheckpointAborted(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData)
+      throws Exception {
+    if (started) {
+      throw new IllegalStateException(
+          "The coordinator can only be reset if it was not yet started");
+    }
+
+    if (checkpointData == null) {
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}.", operatorName, checkpointId);
+    completeAggregateDataStatistics =
+        InstantiationUtil.deserializeObject(
+            checkpointData, AggregateDataStatistics.class.getClassLoader());
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    this.runInCoordinatorThread(

Review Comment:
   right. I will remove all unneeded 'this'



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorProvider.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.jetbrains.annotations.NotNull;
+
+public class DataStatisticsCoordinatorProvider<K extends Serializable>
+    extends RecreateOnResetOperatorCoordinator.Provider {
+
+  private final String operatorName;
+  private final DataStatisticsFactory<K> dataStatisticsFactory;
+
+  public DataStatisticsCoordinatorProvider(
+      String operatorName, OperatorID operatorID, DataStatisticsFactory<K> dataStatisticsFactory) {
+    super(operatorID);
+    this.operatorName = operatorName;
+    this.dataStatisticsFactory = dataStatisticsFactory;
+  }
+
+  @Override
+  public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
+    DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, context.getUserCodeClassloader());
+    ExecutorService coordinatorExecutor =
+        Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    DataStatisticsCoordinatorContext<K> dataStatisticsCoordinatorContext =

Review Comment:
   When implementing the DataStatisticsCoordiantorProvider, I took SourceCoordinatorProvider as refernce.  https://github.com/apache/flink/blob/423cdcb99c3f66be435ad2e70d6a15f10a69e252/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java#L75  The SourceCoordinatorContext is more complex which needs to assignSplits and send event to source readers.  The DataStatisticsContext is much simpler. We can leave DataStatisticsCoordinator to initialize the executor and the context like what you said.   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));
+  }
+
+  int currentParallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  void attemptReady(OperatorCoordinator.SubtaskGateway gateway) {
+    Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.registerSubtaskGateway(gateway);
+  }
+
+  void attemptFailed(int subtaskIndex, int attemptNumber) {
+    Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.unregisterSubtaskGateway(subtaskIndex, attemptNumber);
+  }
+
+  void subtaskReset(int subtaskIndex) {
+    Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.reset(subtaskIndex);
+  }
+
+  void failJob(Throwable cause) {
+    operatorCoordinatorContext.failJob(cause);
+  }
+
+  /**
+   * A helper method that delegates the callable to the coordinator thread if the current thread is
+   * not the coordinator thread, otherwise call the callable right away.
+   *
+   * @param callable the callable to delegate.
+   */
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      this.gateways = new Map[parallelism];

Review Comment:
   gateways is an array of Map. We can use `Maps.newHashMap()` to init the map in the array.
   Not sure how to use Maps to define the array. Any example?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org