You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "yegangy0718 (via GitHub)" <gi...@apache.org> on 2023/04/03 04:53:31 UTC

[GitHub] [iceberg] yegangy0718 opened a new pull request, #7269: Flink: Data statistics operator sends local data statistics event to coordinator and receive aggregated data statistics from coordinator for smart shuffling

yegangy0718 opened a new pull request, #7269:
URL: https://github.com/apache/iceberg/pull/7269

   This PR is created as part of issue https://github.com/apache/iceberg/issues/6303 and project https://github.com/apache/iceberg/projects/27
   
   In this PR, we implement the logic in DataStatisticsOperator to send local data statistics to the coordinator and receive aggregated data statistics from the coordinator. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yegangy0718 commented on a diff in pull request #7269: Flink: Data statistics operator sends local data statistics to coordinator and receive aggregated data statistics from coordinator for smart shuffling

Posted by "yegangy0718 (via GitHub)" <gi...@apache.org>.
yegangy0718 commented on code in PR #7269:
URL: https://github.com/apache/iceberg/pull/7269#discussion_r1157840311


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsEvent.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatistics;
+
+/**
+ * DataStatisticsEvent is sent between data statistics coordinator and operator to transmit data
+ * statistics
+ */
+@Internal
+class DataStatisticsEvent<K> implements OperatorEvent {
+
+  private static final long serialVersionUID = 1L;
+
+  private final long checkpointId;
+  private final DataStatistics<K> dataStatistics;
+
+  DataStatisticsEvent(long checkpointId, DataStatistics<K> dataStatistics) {
+    this.checkpointId = checkpointId;
+    this.dataStatistics = dataStatistics;
+  }
+
+  long checkpointId() {
+    return checkpointId;
+  }
+
+  DataStatistics<K> dataStatistics() {
+    return dataStatistics;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(

Review Comment:
   right, I should use MoreObjects as the other classes. I will update it in the latest commit. 



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -126,8 +133,9 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
       globalStatisticsState.add(globalStatistics);

Review Comment:
   I will use a follow-up PR to address the serialization. 



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -97,8 +97,15 @@ public void open() throws Exception {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void handleOperatorEvent(OperatorEvent evt) {
-    // TODO: receive event with aggregated statistics from coordinator and update globalStatistics
+    if (evt instanceof DataStatisticsEvent) {
+      globalStatistics = ((DataStatisticsEvent<K>) evt).dataStatistics();
+      output.collect(
+          new StreamRecord<>(DataStatisticsOrRecord.fromDataStatistics(globalStatistics)));
+    } else {
+      throw new IllegalStateException("Received unexpected operator event " + evt);

Review Comment:
   OK. Will remove the IllegalStateException and add 
   ```
    Preconditions.checkArgument(
               event instanceof DataStatisticsEvent,
               "Received unexpected operator event " + event.getClass());
   ```



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -97,8 +97,15 @@ public void open() throws Exception {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void handleOperatorEvent(OperatorEvent evt) {

Review Comment:
   OK. I took it from the interface. I will update it to event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #7269: Flink: Data statistics operator sends local data statistics to coordinator and receive aggregated data statistics from coordinator for smart shuffling

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu merged PR #7269:
URL: https://github.com/apache/iceberg/pull/7269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7269: Flink: Data statistics operator sends local data statistics to coordinator and receive aggregated data statistics from coordinator for smart shuffling

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7269:
URL: https://github.com/apache/iceberg/pull/7269#discussion_r1156898170


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsEvent.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatistics;
+
+/**
+ * DataStatisticsEvent is sent between data statistics coordinator and operator to transmit data
+ * statistics
+ */
+@Internal
+class DataStatisticsEvent<K> implements OperatorEvent {
+
+  private static final long serialVersionUID = 1L;
+
+  private final long checkpointId;
+  private final DataStatistics<K> dataStatistics;
+
+  DataStatisticsEvent(long checkpointId, DataStatistics<K> dataStatistics) {
+    this.checkpointId = checkpointId;
+    this.dataStatistics = dataStatistics;
+  }
+
+  long checkpointId() {
+    return checkpointId;
+  }
+
+  DataStatistics<K> dataStatistics() {
+    return dataStatistics;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(

Review Comment:
   nit: Use `MoreObjects.toStringHelper(this)` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7269: Flink: Data statistics operator sends local data statistics to coordinator and receive aggregated data statistics from coordinator for smart shuffling

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7269:
URL: https://github.com/apache/iceberg/pull/7269#issuecomment-1497859837

   @hililiwei do you have more comments for this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7269: Flink: Data statistics operator sends local data statistics to coordinator and receive aggregated data statistics from coordinator for smart shuffling

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7269:
URL: https://github.com/apache/iceberg/pull/7269#discussion_r1157713657


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -97,8 +97,15 @@ public void open() throws Exception {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void handleOperatorEvent(OperatorEvent evt) {
-    // TODO: receive event with aggregated statistics from coordinator and update globalStatistics
+    if (evt instanceof DataStatisticsEvent) {
+      globalStatistics = ((DataStatisticsEvent<K>) evt).dataStatistics();
+      output.collect(
+          new StreamRecord<>(DataStatisticsOrRecord.fromDataStatistics(globalStatistics)));
+    } else {
+      throw new IllegalStateException("Received unexpected operator event " + evt);

Review Comment:
   nit: iceberg typically uses `Preconditions` for this type of check. 
   
   also we probably don't want to include the event string in the error msg as it can include the big statistics map. just the class name should be sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7269: Flink: Data statistics operator sends local data statistics to coordinator and receive aggregated data statistics from coordinator for smart shuffling

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7269:
URL: https://github.com/apache/iceberg/pull/7269#discussion_r1157710196


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -97,8 +97,15 @@ public void open() throws Exception {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void handleOperatorEvent(OperatorEvent evt) {

Review Comment:
   missed from last PR. Iceberg coding style tries to avoid unnecessary abbreviations like `evt`. maybe just `event` or `operatorEvent`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7269: Flink: Data statistics operator sends local data statistics to coordinator and receive aggregated data statistics from coordinator for smart shuffling

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7269:
URL: https://github.com/apache/iceberg/pull/7269#discussion_r1157713790


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -97,8 +97,15 @@ public void open() throws Exception {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void handleOperatorEvent(OperatorEvent evt) {
-    // TODO: receive event with aggregated statistics from coordinator and update globalStatistics
+    if (evt instanceof DataStatisticsEvent) {
+      globalStatistics = ((DataStatisticsEvent<K>) evt).dataStatistics();
+      output.collect(
+          new StreamRecord<>(DataStatisticsOrRecord.fromDataStatistics(globalStatistics)));
+    } else {
+      throw new IllegalStateException("Received unexpected operator event " + evt);

Review Comment:
   nit: iceberg typically uses `Preconditions` for this type of check. 
   
   also we probably don't want to include the event string in the error msg as it can include the big statistics map. just the class name should be sufficient.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -126,8 +133,9 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
       globalStatisticsState.add(globalStatistics);

Review Comment:
   Just realized one thing that I missed from last PR. It can be addressed with a separate PR. We don't want to use Kryo Java serialization for the `DataStatistics`. We need a stable parser (E.g. `SimpleVersionedSerializer`). You can find some example from `IcebergEnumeratorStateSerializer`.
   
   You can find some more context from https://github.com/apache/iceberg/issues/1698.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7269: Flink: Data statistics operator sends local data statistics to coordinator and receive aggregated data statistics from coordinator for smart shuffling

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7269:
URL: https://github.com/apache/iceberg/pull/7269#issuecomment-1499231745

   thanks @yegangy0718 for the contribution and @hililiwei for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org