You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/26 21:46:46 UTC

[GitHub] [iceberg] kbendick commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

kbendick commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460577287



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       This code here is handling the case where we've not seen this partition key yet. This is especially likely to happen when users did not `keyBy` or otherwise pre-shuffle the data according to the partition key.
   
   Is pre-shuffling something that the users should be doing before writing to the table (either `keyBy` or `ORDER BY` in Flink SQL)? I understand that this is specifically a `PartitionedFanoutWriter`, and so it makes sense that keys might not always come together (and even in the case where users did `keyBy` the partition key, if the number of TaskManager slots that are writing does not equal the cardinality of the partition key you'll still wind up with multiple RollingFileAppenders in a single Flink writing task and thus fanout). However, for long running streaming queries, it's possible that this TaskManager doesn't see this partition key again for days or even weeks (especially at a high enough volume to emit a complete file of the given target file size). 
   
   I guess my concern is that users wind up with a very high cardinality of keys on a single TaskManager. Either because they didn't pre-shuffle their data or perhaps they have an imbalance between the cardinality on the partition key and the parallelism at the write stage such that records might not naturally group together enough to emit an entire file. Or,  as another edge case, one partition key value is simply not common enough to emit an entire file from this `PartitionedFanoutWriter`.
   
   IIUC, if the `PartitionedFanoutWriter` does not see this partition key enough times in this TaskManager again to emit a full file for quite some time, a file containing this data won't be written until `close` is called. For very long running streaming jobs, this could be days or even weeks in my experience. This could also lead to small files upon `close`. Is this a concern that Iceberg should take into consideration or is this left to the users in their Flink query to determine when tuning their queries? 
   
   I imagine with S3, data locality of a file written much later than its timestamp of when the data was received is not a major concern, as the manifest file will tell whatever query engine reads this table which keys in their S3 bucket to grab and the locality issue is relatively abstracted away from the user, but what about if the user is using HDFS? Could this lead to performance issues (or even correctness issues) on read if records with relatively similar timestamps at their RollingFileAppender are scattered across a potentially large number of files?
   
   I suppose this amounts to three concerns (and forgive me if these are non-issues as I am still new to the project, but not new to Flink so partially this is for helping me understand, as well as reviewing my concerns when reading this code):
   1) Should we be concerned that a writer won't emit a file until a streaming query is closed due to the previously mentioned case? Possibly tracking the time that each writer has existed and then emitting a file if it has been far too long (however that could be determined).
   2) If a record comes in at some time, and then the file containing that record isn't written for a much greater period of time (on the order of days or weeks), could this lead to correctness problems or very large performance problems when any query engine reads this table?
   3) Would it be beneficial to at least emit a warning or info level log to the user that it might be beneficial to pre-partition their data according to the partition key spec if perhaps the number of unique `RollingFileAppender` writers gets too high for one given Flink writer slot / TaskManager? Admittedly, it might be difficult to determine a heuristic of when this might be a problem vs just the natural difference in the parallelism of writing task slots vs the cardinality of the partition key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org