You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "yegangy0718 (via GitHub)" <gi...@apache.org> on 2023/03/04 07:20:24 UTC

[GitHub] [iceberg] yegangy0718 commented on a diff in pull request #6382: Implement ShuffleOperator to collect data statistics

yegangy0718 commented on code in PR #6382:
URL: https://github.com/apache/iceberg/pull/6382#discussion_r1125391014


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle.statistics;
+
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/** MapDataStatistics uses map to count key frequency */
+@Internal
+public class MapDataStatistics<K> implements DataStatistics<K> {
+  private final Map<K, Long> statistics = Maps.newHashMap();
+
+  @Override
+  public boolean isEmpty() {
+    return statistics.size() == 0;
+  }
+
+  @Override
+  public void add(K key) {
+    // increase count of occurrence by one in the dataStatistics map
+    statistics.put(key, statistics.getOrDefault(key, 0L) + 1L);
+  }
+
+  @Override
+  public void merge(DataStatistics<K> otherStatistics) {
+    Preconditions.checkArgument(
+        otherStatistics instanceof MapDataStatistics,
+        "Can not merge this type of statistics: " + otherStatistics);
+    MapDataStatistics<K> mapDataStatistic = (MapDataStatistics<K>) otherStatistics;

Review Comment:
   I think if the input is not in the right type, it would be better to throw IllegalArgumentException.
   I checked the other places as well, for example at https://github.com/apache/iceberg/blob/9cf9ca23eef196b3615f14dfa5cf6024dff85f6d/api/src/main/java/org/apache/iceberg/SortOrder.java#L245
   and https://github.com/apache/iceberg/blob/9cf9ca23eef196b3615f14dfa5cf6024dff85f6d/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java#L89
   It's common that first checking type and then convert it into specific type.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatisticsFactory;
+import org.apache.iceberg.flink.sink.shuffle.statistics.MapDataStatistics;
+import org.apache.iceberg.flink.sink.shuffle.statistics.MapDataStatisticsFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDataStatisticsOperator {

Review Comment:
   yes, for now in the current `DataStatisticsOperator` implementation, we set `globalStatisticsState` in function `DataStatisticsOperator#snapshotState`. But to get `globalStatisticsState`, it needs to implement the function `DataStatisticsOperator#handleOperatorEvent` which we plan to do in another PR.  



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/statistics/MapDataStatistics.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle.statistics;
+
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/** MapDataStatistics uses map to count key frequency */
+@Internal
+public class MapDataStatistics<K> implements DataStatistics<K> {
+  private final Map<K, Long> statistics = Maps.newHashMap();
+
+  @Override
+  public boolean isEmpty() {
+    return statistics.size() == 0;
+  }
+
+  @Override
+  public void add(K key) {
+    // increase count of occurrence by one in the dataStatistics map
+    statistics.put(key, statistics.getOrDefault(key, 0L) + 1L);

Review Comment:
   will update in latest commit



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsAndRecordWrapper.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import org.apache.iceberg.flink.sink.shuffle.statistics.DataStatistics;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * The wrapper class for data statistics and record. It is the only way for data statistics operator to send
+ * global data statistics to custom partitioner to distribute data based on statistics
+ *
+ * <p>DataStatisticsAndRecordWrapper is sent from {@link DataStatisticsOperator} to partitioner. It
+ * contains either data statistics(globally aggregated) or a record. Once partitioner receives the data
+ * statistics, it will use that to decide the coming record should send to which writer subtask. After
+ * shuffling, a filter and mapper are required to filter out the data distribution weight, unwrap the
+ * object and extract the original record type T.
+ */
+public class DataStatisticsAndRecordWrapper<T, K> implements Serializable {

Review Comment:
   The reason I rename it is, in `DataStatisticsOperator`, it generates the object which contains either global data statistics or record, while the place where shuffle happens is at the partitioner. The name `DataStatisticsAndRecord` is closer to the usage of the class(transmit global data statistics).
   
   Indeed, like you said, the other class which uses `And` contains both. For example, `RecordAndPosition` class, it contains both record and position. 
   How about naming it `DataStatisticsOrRecord`  even though there is no such kind of `or` class in the current repo :( 
   WDYT
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org