You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/16 13:31:12 UTC

[GitHub] [iceberg] openinx opened a new pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

openinx opened a new pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213


   When I implement the PR https://github.com/apache/iceberg/pull/1145, I found that the flink TaskWriter share most of the codes with spark. So I did some abstraction to move the common logics in the `iceberg-core` module, so that both of them could share it. 
   
   FYI @rdblue .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463351988



##########
File path: core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java
##########
@@ -17,24 +17,42 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.io;
 
 import java.io.IOException;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.io.FileIO;
-import org.apache.spark.sql.catalyst.InternalRow;
 
-class UnpartitionedWriter extends BaseWriter {
-  UnpartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                      OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+public class UnpartitionedWriter<T> extends BaseTaskWriter<T> {
+
+  private RollingFileWriter currentWriter = null;
+
+  public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                             OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
 
-    openCurrent();
+  @Override
+  public void write(T record) throws IOException {
+    if (currentWriter == null) {
+      currentWriter = new RollingFileWriter(null);
+    }

Review comment:
       I think this PR should not change writers to be lazily created.
   
   First, it changes the assumptions in the writers, which doesn't make sense to include in what is primarily a refactor.
   
   Second, I think those assumptions were a better structure for these classes. Opening the file in the constructor and relying on it always being there avoids a null check in `write`, which is called in a tight loop. The main benefit of this is to avoid a delete in close when no records were written, but that check is still present in `RollingFileWriter`. And I think that check _should_ be there because it is another helpful invariant: if a 0-record file is produced by any writer wrapped by `RollingFileWriter`, then it should be discarded. That helps avoid the problem in future implementations, which may not consider the case.
   
   This is fairly minor, but since there are other changes needed (in particular, the array fix for task commit messages), I'd like to change at least the Spark writers back to eagerly creating output files instead of lazily checking for null in `write`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460684652



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java
##########
@@ -63,10 +66,29 @@ public void write(InternalRow row) throws IOException {
         throw new IllegalStateException("Already closed files for partition: " + key.toPath());
       }
 
-      setCurrentKey(key.copy());
-      openCurrent();
+      currentKey = key.copy();
+    }
+
+    if (currentAppender == null) {
+      currentAppender = new RollingFileAppender(currentKey);

Review comment:
       Now we've changed to maintain the partitionKey inside the `RollingFileWriter` (we've discussed this before , this is because for fanout writer we may have multiple writers append records),  so the  RollingFileAppender creation is actually doing the partition key setting.  I did not open the appender here because we only need to open an appender when there's a real record to write (in case of opening an appender without writing a record) ,  all those logic have been hidden inside the RollingFileAppender.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460637387



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -92,16 +96,17 @@ private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
     RowDataReader dataReader = new RowDataReader(
         task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
 
-    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
-        properties, schema, SparkSchemaUtil.convert(schema));
+    StructType structType = SparkSchemaUtil.convert(schema);
+    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType);

Review comment:
       I changed this because the `SparkPartitionedWriter`  will need the spark's `StructType` to decide how to partition, so I move the `StructType structType = SparkSchemaUtil.convert(schema);`  into a separate line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#issuecomment-663985528


   Ping @rdblue , I think this issue is currently the biggest blocker to move flink sink connector forward now, pls take a look if you have time, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463344226



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -92,16 +96,17 @@ private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
     RowDataReader dataReader = new RowDataReader(
         task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
 
-    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
-        properties, schema, SparkSchemaUtil.convert(schema));
+    StructType structType = SparkSchemaUtil.convert(schema);
+    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType);

Review comment:
       Got it. I see it's used later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460583237



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java
##########
@@ -17,41 +17,44 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.io;
 
 import java.io.IOException;
 import java.util.Set;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class PartitionedWriter extends BaseWriter {
+public abstract class PartitionedWriter<T> extends BaseTaskWriter<T> {
   private static final Logger LOG = LoggerFactory.getLogger(PartitionedWriter.class);
 
-  private final PartitionKey key;
-  private final InternalRowWrapper wrapper;
   private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
 
-  PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                    OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) {
+  private PartitionKey currentKey = null;
+  private RollingFileAppender currentAppender = null;

Review comment:
       Now that the current key is null, we will need a check before adding it to `completedPartitions` in the `write` method:
   
   ```java
   if (!key.equals(currentKey)) {
     closeCurrent();
     if (currentKey != null) {
       // if the key is null, there was no previous current key
       completedPartitions.add(currentKey);
     }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#issuecomment-661551337


   Yeah, you're right. Please help to review this patch firstly if you have time , sir. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460589483



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       Ah ok. I hadn't realized that was the plan.
   
   I wrote a parquet writer for flink way back when flink did not support it and outputting files on checkpoint was the only real solution that I could come up with.
   
   It also involved forking the base parquet-library, so we wound up abandoning it as we don't really have the engineering head count to be constantly updating and maintaining something like that. Despite the fact that Flink can now support writing parquet files etc, this is why I'm interested in this project. That and then the numerous additions to the data lake that the project supports.
   
   Thanks for the info @rdblue!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463466655



##########
File path: core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java
##########
@@ -17,24 +17,42 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.io;
 
 import java.io.IOException;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.io.FileIO;
-import org.apache.spark.sql.catalyst.InternalRow;
 
-class UnpartitionedWriter extends BaseWriter {
-  UnpartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                      OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+public class UnpartitionedWriter<T> extends BaseTaskWriter<T> {
+
+  private RollingFileWriter currentWriter = null;
+
+  public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                             OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
 
-    openCurrent();
+  @Override
+  public void write(T record) throws IOException {
+    if (currentWriter == null) {
+      currentWriter = new RollingFileWriter(null);
+    }

Review comment:
       Fine, that sounds reasonable. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#issuecomment-667319207


   Thanks, @openinx! I fixed the minor problem that caused tests to fail and merged this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460577287



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       This code here is handling the case where we've not seen this partition key yet. This is especially likely to happen when users did not `keyBy` or otherwise pre-shuffle the data according to the partition key.
   
   Is pre-shuffling something that the users should be doing before writing to the table (either `keyBy` or `ORDER BY` in Flink SQL)? I understand that this is specifically a `PartitionedFanoutWriter`, and so it makes sense that keys might not always come together (and even in the case where users did `keyBy` the partition key, if the number of TaskManager slots that are writing does not equal the cardinality of the partition key you'll still wind up with multiple RollingFileAppenders in a single Flink writing task and thus fanout). However, for long running streaming queries, it's possible that this TaskManager doesn't see this partition key again for days or even weeks (especially at a high enough volume to emit a complete file of the given target file size). 
   
   I guess my concern is that users wind up with a very high cardinality of keys on a single TaskManager. Either because they didn't pre-shuffle their data or perhaps they have an imbalance between the cardinality on the partition key and the parallelism at the write stage such that records might not naturally group together enough to emit an entire file. Or,  as another edge case, one partition key value is simply not common enough to emit an entire file from this `PartitionedFanoutWriter`.
   
   IIUC, if the `PartitionedFanoutWriter` does not see this partition key enough times in this TaskManager again to emit a full file for quite some time, a file containing this data won't be written until `close` is called. For very long running streaming jobs, this could be days or even weeks in my experience. This could also lead to small files upon `close`. Is this a concern that Iceberg should take into consideration or is this left to the users in their Flink query to determine when tuning their queries? 
   
   I imagine with S3, data locality of a file written much later than its timestamp of when the data was received is not a major concern, as the manifest file will tell whatever query engine reads this table which keys in their S3 bucket to grab and the locality issue is relatively abstracted away from the user, but what about if the user is using HDFS? Could this lead to performance issues (or even correctness issues) on read if records with relatively similar timestamps at their RollingFileAppender are scattered across a potentially large number of files?
   
   I suppose this amounts to three concerns (and forgive me if these are non-issues as I am still new to the project, but not new to Flink so partially this is for helping me understand, as well as reviewing my concerns when reading this code):
   1) Should we be concerned that a writer won't emit a file until a streaming query is closed due to the previously mentioned case? Possibly tracking the time that each writer has existed and then emitting a file if it has been far too long (however that could be determined).
   2) If a record comes in at some time, and then the file containing that record isn't written for a much greater period of time (on the order of days or weeks), could this lead to correctness problems or very large performance problems when any query engine reads this table?
   3) Would it be beneficial to at least emit a warning or info level log to the user that it might be beneficial to pre-partition their data according to the partition key spec if perhaps the number of unique `RollingFileAppender` writers gets too high for one given Flink writer slot / TaskManager? Admittedly, it might be difficult to determine a heuristic of when this might be a problem vs just the natural difference in the parallelism of writing task slots vs the cardinality of the partition key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#issuecomment-664054044


   Thanks, @openinx! The `RollingFileAppender` looks much better. Just a few minor things to take care of to minimize the changes at this point. Otherwise I think this mostly looks ready.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460583237



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java
##########
@@ -17,41 +17,44 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.io;
 
 import java.io.IOException;
 import java.util.Set;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class PartitionedWriter extends BaseWriter {
+public abstract class PartitionedWriter<T> extends BaseTaskWriter<T> {
   private static final Logger LOG = LoggerFactory.getLogger(PartitionedWriter.class);
 
-  private final PartitionKey key;
-  private final InternalRowWrapper wrapper;
   private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
 
-  PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                    OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) {
+  private PartitionKey currentKey = null;
+  private RollingFileAppender currentAppender = null;

Review comment:
       Now that the current key is null, we will need a check before adding it to `completedPartitions` in the `write` method:
   
   ```java
   if (!key.equals(currentKey)) {
     closeCurrent();
     if (currentKey != null) {
       // if the key is null, there was no previous current key
       completedPartitions.add(currentKey);
     }
     ...
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460584199



##########
File path: flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.flink.data.FlinkAvroWriter;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.parquet.Parquet;
+
+class TaskWriterFactory {
+  private TaskWriterFactory() {
+  }
+
+  static TaskWriter<Row> createTaskWriter(Schema schema,
+                                          PartitionSpec spec,
+                                          FileFormat format,
+                                          FileAppenderFactory<Row> appenderFactory,
+                                          OutputFileFactory fileFactory,
+                                          FileIO io,
+                                          long targetFileSizeBytes) {
+    if (spec.fields().isEmpty()) {
+      return new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io, targetFileSizeBytes);
+    } else {
+      return new RowPartitionedFanoutWriter(spec, format, appenderFactory, fileFactory,
+          io, targetFileSizeBytes, schema);
+    }
+  }
+
+  private static class RowPartitionedFanoutWriter extends PartitionedFanoutWriter<Row> {
+
+    private final PartitionKey partitionKey;
+    private final RowWrapper rowWrapper;
+
+    RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<Row> appenderFactory,
+                               OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema) {
+      super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+      this.partitionKey = new PartitionKey(spec, schema);
+      this.rowWrapper = new RowWrapper(schema.asStruct());
+    }
+
+    @Override
+    protected PartitionKey partition(Row row) {
+      partitionKey.partition(rowWrapper.wrap(row));
+      return partitionKey;
+    }
+  }
+
+  static class FlinkFileAppenderFactory implements FileAppenderFactory<Row> {
+    private final Schema schema;
+    private final Map<String, String> props;
+
+    FlinkFileAppenderFactory(Schema schema, Map<String, String> props) {
+      this.schema = schema;
+      this.props = props;
+    }
+
+    @Override
+    public FileAppender<Row> newAppender(OutputFile outputFile, FileFormat format) {

Review comment:
       #1232 and #1237 rebuild the Avro and Parquet writers to use `RowData` instead of `Row`. To deconflict, do you think it makes sense to get the base classes and Spark refactor in this PR and separate out the Flink side? I'm fine either way, whatever you think is going to be easier.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459154981



##########
File path: core/src/main/java/org/apache/iceberg/taskio/OutputFileFactory.java
##########
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.taskio;

Review comment:
       Why not just use the existing `io` package? That, or maybe a `tasks` package.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460582532



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> complete() throws IOException {
+    close();
+
+    if (completedFiles.size() > 0) {
+      return ImmutableList.copyOf(completedFiles);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  class RollingFileAppender implements Closeable {

Review comment:
       Minor: This doesn't implement `FileAppender`, so maybe `RollingFileWriter` would make more sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459160008



##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {
+    if (completedFiles.size() > 0) {
+      List<DataFile> dataFiles = ImmutableList.copyOf(completedFiles);
+      completedFiles.clear();
+      return dataFiles;
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected OutputFileFactory outputFileFactory() {
+    return this.fileFactory;
+  }
+
+  WrappedFileAppender createWrappedFileAppender(PartitionKey partitionKey,

Review comment:
       Okay, looks like this is intended to support the fan-out writer because the current `BaseWriter` only supports one open file at a time. I see why there are more changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459196795



##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {
+    if (completedFiles.size() > 0) {
+      List<DataFile> dataFiles = ImmutableList.copyOf(completedFiles);
+      completedFiles.clear();
+      return dataFiles;
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected OutputFileFactory outputFileFactory() {
+    return this.fileFactory;
+  }
+
+  WrappedFileAppender createWrappedFileAppender(PartitionKey partitionKey,
+                                                Supplier<EncryptedOutputFile> outputFileSupplier) {
+    EncryptedOutputFile outputFile = outputFileSupplier.get();
+    FileAppender<T> appender = appenderFactory.newAppender(outputFile.encryptingOutputFile(), format);
+    return new WrappedFileAppender(partitionKey, outputFile, appender);
+  }
+
+  class WrappedFileAppender {

Review comment:
       I created this class because in fanout writer  we will have several opening writers  and when building the `DataFile`,  we will need all the informations for the given FileAppender,  such as partitionKey,  EncryptedOutputFile etc. The previous spark implementations won't need the class because all of the context information are maintained inside the  PartitionedWriter (currentXXX ), that's not work for fanout writer. It will be better to have such a class to hold those infos to build `DataFile`. 
   
   > It would make sense to keep this class if it completely encapsulated the logic of rolling new files
   Good point.   Make the WrappedFileAppender to accomplish all the rolling things,  let me refactor this.
    

##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {
+    if (completedFiles.size() > 0) {
+      List<DataFile> dataFiles = ImmutableList.copyOf(completedFiles);
+      completedFiles.clear();
+      return dataFiles;
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected OutputFileFactory outputFileFactory() {
+    return this.fileFactory;
+  }
+
+  WrappedFileAppender createWrappedFileAppender(PartitionKey partitionKey,
+                                                Supplier<EncryptedOutputFile> outputFileSupplier) {
+    EncryptedOutputFile outputFile = outputFileSupplier.get();
+    FileAppender<T> appender = appenderFactory.newAppender(outputFile.encryptingOutputFile(), format);
+    return new WrappedFileAppender(partitionKey, outputFile, appender);
+  }
+
+  class WrappedFileAppender {

Review comment:
       I created this class because in fanout writer  we will have several opening writers  and when building the `DataFile`,  we will need all the informations for the given FileAppender,  such as partitionKey,  EncryptedOutputFile etc. The previous spark implementations won't need the class because all of the context information are maintained inside the  PartitionedWriter (currentXXX ), that's not work for fanout writer. It will be better to have such a class to hold those infos to build `DataFile`. 
   
   > It would make sense to keep this class if it completely encapsulated the logic of rolling new files
   
   Good point.   Make the WrappedFileAppender to accomplish all the rolling things,  let me refactor this.
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460589483



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       As I think about it more, this is probably not a concern that needs to be considered in this PR. The purpose of this PR is to abstract the generic task writers to share between Flink and Spark.
   
   However, I would like to further discuss if we should consider this issue for longer running streaming programs in general where a row with a predicate reaches a writing TaskManager and does not output its values / output its file for a long time. Having further read through the docs and the existing code base, I don't think this could affect correctness but I still think it might cause performance issues during scan planning when reading from the partitioned table.
   
   During scan planning, IIUC, an inclusive projection could possibly match a very large number of rows that might fall outside of the predicate range if the `RollingFileAppender` for this rarely observed predicate at this Task Manager buffers its data for a very long time before writing (say days or even weeks in a longer running streaming query).
   
   I definitely don't think this needs to be tackled in this PR, but I would like to discuss what we expect to happen in this situation and how downstream systems that read this table will handle this situation. To me, this is different than the spark streaming writer as that is still using the batch writer due to spark's microbatch processing.
   
   cc @JingsongLi to see if my concern here is at all well founded or if I'm simply misunderstanding Icebergs intended behavior during read and write. It's possible if users are only using the blink planner that they might also still be using microbatches that would then cover my concern.
   
   I can attempt to come up with an example that further demonstrates my concern if need be, though I don't think my concern should be cause for blocking this PR. At the very least, this long buffered data might not be observed by down stream systems until a much later snapshot than snapshots that occurred around the time that rows first entered this writer's buffer, which seems like unexpected behavior on read.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463344052



##########
File path: site/docs/javadoc/0.9.0/serialized-form.html
##########
@@ -1031,7 +1031,7 @@ <h4>spec</h4>
 <li class="blockList"><a id="org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit">
 <!--   -->
 </a>
-<h3>Class org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit extends org.apache.iceberg.spark.source.TaskResult implements Serializable</h3>
+<h3>Class org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit implements Serializable</h3>

Review comment:
       The Javadoc for a release should not be modified. I think this is probably a search and replace error.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460583043



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java
##########
@@ -63,10 +66,29 @@ public void write(InternalRow row) throws IOException {
         throw new IllegalStateException("Already closed files for partition: " + key.toPath());
       }
 
-      setCurrentKey(key.copy());
-      openCurrent();
+      currentKey = key.copy();
+    }
+
+    if (currentAppender == null) {
+      currentAppender = new RollingFileAppender(currentKey);

Review comment:
       It would be nice to not change the logic for opening an appender. Before, this was part of the flow of changing partitions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460583043



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java
##########
@@ -63,10 +66,29 @@ public void write(InternalRow row) throws IOException {
         throw new IllegalStateException("Already closed files for partition: " + key.toPath());
       }
 
-      setCurrentKey(key.copy());
-      openCurrent();
+      currentKey = key.copy();
+    }
+
+    if (currentAppender == null) {
+      currentAppender = new RollingFileAppender(currentKey);

Review comment:
       It would be nice to not change the logic for opening an appender. Before, this was part of the flow of changing partitions and I don't see any value in moving it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459162715



##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {
+    if (completedFiles.size() > 0) {
+      List<DataFile> dataFiles = ImmutableList.copyOf(completedFiles);
+      completedFiles.clear();
+      return dataFiles;
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected OutputFileFactory outputFileFactory() {
+    return this.fileFactory;
+  }
+
+  WrappedFileAppender createWrappedFileAppender(PartitionKey partitionKey,
+                                                Supplier<EncryptedOutputFile> outputFileSupplier) {
+    EncryptedOutputFile outputFile = outputFileSupplier.get();
+    FileAppender<T> appender = appenderFactory.newAppender(outputFile.encryptingOutputFile(), format);
+    return new WrappedFileAppender(partitionKey, outputFile, appender);
+  }
+
+  class WrappedFileAppender {
+    private final PartitionKey partitionKey;
+    private final EncryptedOutputFile encryptedOutputFile;
+    private final FileAppender<T> appender;
+
+    private boolean closed = false;
+    private long currentRows = 0;
+
+    WrappedFileAppender(PartitionKey partitionKey, EncryptedOutputFile encryptedOutputFile, FileAppender<T> appender) {
+      this.partitionKey = partitionKey;
+      this.encryptedOutputFile = encryptedOutputFile;
+      this.appender = appender;
+    }
+
+    void add(T record) {
+      this.appender.add(record);
+      this.currentRows++;
+    }
+
+    boolean shouldRollToNewFile() {
+      //TODO: ORC file now not support target file size before closed

Review comment:
       We should consider changing the ORC appender to simply return 0 if the file isn't finished. That way this check is still valid, but the file will never be rolled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#issuecomment-666400663


   Ping @rdblue , Mind to take another look ? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460585801



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       > Should we be concerned that a writer won't emit a file until a streaming query is closed due to the previously mentioned case?
   
   I think that the intent is to close and emit all of the file files each checkpoint, rather than keeping them open. That is required to achieve exactly-once writes because the data needs to be committed to the table.
   
   I think that also takes care of your second question because data is constantly added to the table.
   
   > Would it be beneficial to at least emit a warning or info level log to the user that it might be beneficial to pre-partition their data according to the partition key spec . . .
   
   I think a reasonable thing to do is to limit the number of writers that are kept open, to limit the resources that are held. Then you can either fail if you go over that limit, or can close and release files with a LRU policy. Failing brings the problem to the user's attention immediately and is similar to what we do on the Spark side, which doesn't allow writing new data to a partition after it is finished. That ensures that data is either clustered for the write, or the job fails.
   
   The long-term plan for Spark is to be able to influence the logical plan that is writing to a table. That would be the equivalent of adding an automatic `keyBy` or rough `orderBy` for Flink. I think we would eventually want to do this for Flink as well, but I'm not sure what data clustering and sorting operations are supported currently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463436266



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -209,9 +212,15 @@ public String toString() {
     return String.format("IcebergWrite(table=%s, format=%s)", table, format);
   }
 
-  private static class TaskCommit extends TaskResult implements WriterCommitMessage {
-    TaskCommit(TaskResult toCopy) {
-      super(toCopy.files());
+  private static class TaskCommit implements WriterCommitMessage {
+    private final List<DataFile> taskFiles;

Review comment:
       Fine,  sounds good.  

##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> complete() throws IOException {
+    close();
+
+    if (completedFiles.size() > 0) {
+      return ImmutableList.copyOf(completedFiles);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected class RollingFileWriter implements Closeable {
+    private static final int ROWS_DIVISOR = 1000;
+    private final PartitionKey partitionKey;
+
+    private EncryptedOutputFile currentFile = null;
+    private FileAppender<T> currentAppender = null;
+    private long currentRows = 0;
+
+    public RollingFileWriter(PartitionKey partitionKey) {
+      this.partitionKey = partitionKey;
+    }
+
+    public void add(T record) throws IOException {
+      if (currentAppender == null) {
+        openCurrent();
+      }
+
+      this.currentAppender.add(record);
+      this.currentRows++;
+
+      if (shouldRollToNewFile()) {
+        closeCurrent();
+      }
+    }
+
+    private void openCurrent() {
+      if (spec.fields().size() == 0) {

Review comment:
       Yeah, it make sense. Thanks. 

##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> complete() throws IOException {
+    close();
+
+    if (completedFiles.size() > 0) {
+      return ImmutableList.copyOf(completedFiles);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected class RollingFileWriter implements Closeable {
+    private static final int ROWS_DIVISOR = 1000;
+    private final PartitionKey partitionKey;
+
+    private EncryptedOutputFile currentFile = null;
+    private FileAppender<T> currentAppender = null;
+    private long currentRows = 0;
+
+    public RollingFileWriter(PartitionKey partitionKey) {
+      this.partitionKey = partitionKey;
+    }
+
+    public void add(T record) throws IOException {
+      if (currentAppender == null) {
+        openCurrent();
+      }
+
+      this.currentAppender.add(record);
+      this.currentRows++;
+
+      if (shouldRollToNewFile()) {
+        closeCurrent();
+      }
+    }
+
+    private void openCurrent() {
+      if (spec.fields().size() == 0) {

Review comment:
       Yeah,  it makes sense.

##########
File path: site/docs/javadoc/0.9.0/serialized-form.html
##########
@@ -1031,7 +1031,7 @@ <h4>spec</h4>
 <li class="blockList"><a id="org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit">
 <!--   -->
 </a>
-<h3>Class org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit extends org.apache.iceberg.spark.source.TaskResult implements Serializable</h3>
+<h3>Class org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit implements Serializable</h3>

Review comment:
       Yes, you are right.  we shouldn't change the 0.9.0 Javadoc,  let's revert it.

##########
File path: site/docs/javadoc/0.9.0/serialized-form.html
##########
@@ -1031,7 +1031,7 @@ <h4>spec</h4>
 <li class="blockList"><a id="org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit">
 <!--   -->
 </a>
-<h3>Class org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit extends org.apache.iceberg.spark.source.TaskResult implements Serializable</h3>
+<h3>Class org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit implements Serializable</h3>

Review comment:
       You are right, we should not change the javadoc of 0.9.0 release.

##########
File path: core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java
##########
@@ -17,24 +17,42 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.io;
 
 import java.io.IOException;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.io.FileIO;
-import org.apache.spark.sql.catalyst.InternalRow;
 
-class UnpartitionedWriter extends BaseWriter {
-  UnpartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                      OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+public class UnpartitionedWriter<T> extends BaseTaskWriter<T> {
+
+  private RollingFileWriter currentWriter = null;
+
+  public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                             OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
 
-    openCurrent();
+  @Override
+  public void write(T record) throws IOException {
+    if (currentWriter == null) {
+      currentWriter = new RollingFileWriter(null);
+    }
+    currentWriter.add(record);
   }
 
   @Override
-  public void write(InternalRow row) throws IOException {
-    writeInternal(row);
+  public void close() throws IOException {
+    closeCurrent();
+  }
+
+  private void closeCurrent() throws IOException {

Review comment:
       Yeah,  its logics could be just moved to `close()`.

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -209,9 +212,15 @@ public String toString() {
     return String.format("IcebergWrite(table=%s, format=%s)", table, format);
   }
 
-  private static class TaskCommit extends TaskResult implements WriterCommitMessage {
-    TaskCommit(TaskResult toCopy) {
-      super(toCopy.files());
+  private static class TaskCommit implements WriterCommitMessage {
+    private final List<DataFile> taskFiles;

Review comment:
       Nice finding,  make sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#issuecomment-667429944


   Thanks for the fixing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460583381



##########
File path: core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+
+public class UnpartitionedWriter<T> extends BaseTaskWriter<T> {
+
+  private RollingFileAppender currentAppender = null;
+
+  public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                             OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  @Override
+  public void write(T record) throws IOException {
+    if (currentAppender == null) {
+      currentAppender = new RollingFileAppender(null);

Review comment:
       Why not initialize `currentAppender` in the constructor? Then we don't need an additional null check in `write`, which is called in a tight loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463346744



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -209,9 +212,15 @@ public String toString() {
     return String.format("IcebergWrite(table=%s, format=%s)", table, format);
   }
 
-  private static class TaskCommit extends TaskResult implements WriterCommitMessage {
-    TaskCommit(TaskResult toCopy) {
-      super(toCopy.files());
+  private static class TaskCommit implements WriterCommitMessage {
+    private final List<DataFile> taskFiles;

Review comment:
       This should be an Array, like it was when this class was based on `TaskResult`. Arrays are easier to handle when working with Serializable classes because we don't have to worry about bugs caused by List implementations (like the recent Kryo bug with unmodifiable lists).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460589483



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       Ah ok. I hadn['t realized that was the plan.
   
   I wrote a parquet writer for flink way back when flink did not support it and outputting files on checkpoint was the only real solution that I could come up with.
   
   It also involved forking the base parquet-library, so we wound up abandoning it as we don't really have the engineering head count to be constantly updating and maintaining something like that. Despite the fact that Flink can now support writing parquet files etc, this is why I'm interested in this project. That and then the numerous additions to the data lake that the project supports.
   
   Thanks for the info!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459162289



##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {
+    if (completedFiles.size() > 0) {
+      List<DataFile> dataFiles = ImmutableList.copyOf(completedFiles);
+      completedFiles.clear();
+      return dataFiles;
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected OutputFileFactory outputFileFactory() {
+    return this.fileFactory;
+  }
+
+  WrappedFileAppender createWrappedFileAppender(PartitionKey partitionKey,
+                                                Supplier<EncryptedOutputFile> outputFileSupplier) {
+    EncryptedOutputFile outputFile = outputFileSupplier.get();
+    FileAppender<T> appender = appenderFactory.newAppender(outputFile.encryptingOutputFile(), format);
+    return new WrappedFileAppender(partitionKey, outputFile, appender);
+  }
+
+  class WrappedFileAppender {

Review comment:
       I don't see much value in this class. Its primary use is to keep track of whether a file is large enough to release, but it doesn't actually have any of the logic to do that. As a consequence, the code is now split across multiple places.
   
   This also has the logic for closing an appender and converting it to a `DataFile`, but that could just as easily be done in a `DataFile closeAppender(FileAppender appender)` method.
   
   It would make sense to keep this class if it completely encapsulated the logic of rolling new files. That would require some refactoring so that it could create new files using the file and appender factories. It would also require passing a `Consumer<DataFile>` so that it can release closed files. Otherwise, I think we should remove this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460585033



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);
+    }
+
+    writer.add(row);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!writers.isEmpty()) {
+      Iterator<RollingFileAppender> iterator = writers.values().iterator();
+      while (iterator.hasNext()) {
+        iterator.next().close();
+        // Remove from the writers after closed.
+        iterator.remove();

Review comment:
       Many iterator classes don't implement `remove`. What about iterating over the key set separately instead?
   
   ```java
     if (!writers.isEmpty) {
       for (PartitionKey key : writers.keySet()) {
         RollingFileAppender writer = writers.remove(key);
         writer.close();
       }
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#issuecomment-666994436


   @rdblue  I've addressed all latest comment, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460583529



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -92,16 +96,17 @@ private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
     RowDataReader dataReader = new RowDataReader(
         task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
 
-    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
-        properties, schema, SparkSchemaUtil.convert(schema));
+    StructType structType = SparkSchemaUtil.convert(schema);
+    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType);

Review comment:
       Is this change needed? It looks non-functional.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459152830



##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {

Review comment:
       I don't think it's a good idea to have a `poll` method like this one because it leaks critical state (`completedFiles`) and creates an opportunity for threading issues between `write` and `pollCompleteFiles`.
   
   Instead, I think the base implementation should use a push model, where each file is released as it is closed.
   
   ```java
     /**
      * Called when a data file is completed and no longer needed by the writer.
      */
     protected abstract void completedFile(DataFile file);
   ```
   
   Then `closeCurrent` would call `completedFile(dataFile)` and the implementation of `completedFile` would handle it from there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459194667



##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {

Review comment:
       I read the `BaseWriter` code again and got the difference here.  For spark streaming writer,  once we did a `commit` ,  then we will create another new  streaming writer to write the future records,  so we don't need a method like `pollCompleteFiles()`  to poll the newly added `DataFile` continusely.  In the current iceberg flink writer implementation,  I will use the same TaskWriter to write record even if  a checkpoint happen,  so I designed the `pollCompleteFiles`  to fetch all completed data files incrementally.    I think it's design difference,  the state leaks issues and threading issues as you said,  it's not a problem in current version but I agree that it's easy to get into those issues if others did not handle it carefully.   I can align with the current spark design.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459151343



##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {
+    if (completedFiles.size() > 0) {
+      List<DataFile> dataFiles = ImmutableList.copyOf(completedFiles);
+      completedFiles.clear();
+      return dataFiles;
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected OutputFileFactory outputFileFactory() {
+    return this.fileFactory;
+  }
+
+  WrappedFileAppender createWrappedFileAppender(PartitionKey partitionKey,

Review comment:
       If this is a refactor, I'd prefer to keep the existing structure the way it was, with `openCurent`, `closeCurrent`, and `writeInternal`. That way we aren't introducing additional changes in this PR. If we want to refactor how writers work, we can do that separately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460589483



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       As I think about it more, this is probably not a concern that needs to be considered in this PR. The purpose of this PR is to abstract the generic task writers to share between Flink and Spark.
   
   However, I would like to further discuss if we should consider this issue for longer running streaming programs in general where a row with a predicate reaches a writing TaskManager and does not output its values / output its file for a long time. Having further read through the docs and the existing code base, I don't think this could affect correctness but I still think it might cause performance issues during scan planning when reading from the partitioned table.
   
   During scan planning, IIUC, an inclusive projection could possibly match a very large number of rows that might fall outside of the predicate range if the `RollingFileAppender` for this rarely observed predicate at this Task Manager buffers its data for a very long time before writing (say days or even weeks in a longer running streaming query).
   
   I definitely don't think this needs to be tackled in this PR, but I would like to discuss what we expect to happen in this situation and how downstream systems that read this table will handle this situation. To me, this is different than the spark streaming writer as that is still using the batch writer due to spark's microbatch processing.
   
   cc @JingsongLi to see if my concern here is at all well founded or if I'm simply misunderstanding Icebergs intended behavior during read and write. It's possible if users are only using the blink planner that they might also still be using microbatches that would then cover my concern.
   
   I can attempt to come up with an example that further demonstrates my concern if need be, though I don't think my concern should be cause for blocking this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459165054



##########
File path: core/src/main/java/org/apache/iceberg/taskio/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Function<T, PartitionKey> keyGetter;

Review comment:
       Instead of passing a function, I think this should be an abstract method:
   
   ```java
     /**
      * Create a PartitionKey from the values in row.
      * <p>
      * Any PartitionKey returned by this method can be reused by the implementation.
      *
      * @param row a data row
      */
     protected abstract PartitionKey partition(T row);
   ```
   
   Passing a function is good if we need to inject behavior that might need to be customized, but here the only customization that would be required is to partition the objects that this class is already parameterized by. So it will be easier just to add a method for subclasses to implement. And that puts the responsibility on the implementation instead of on the code that constructs the writer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463347911



##########
File path: core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java
##########
@@ -17,24 +17,42 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.io;
 
 import java.io.IOException;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.io.FileIO;
-import org.apache.spark.sql.catalyst.InternalRow;
 
-class UnpartitionedWriter extends BaseWriter {
-  UnpartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                      OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+public class UnpartitionedWriter<T> extends BaseTaskWriter<T> {
+
+  private RollingFileWriter currentWriter = null;
+
+  public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                             OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
 
-    openCurrent();
+  @Override
+  public void write(T record) throws IOException {
+    if (currentWriter == null) {
+      currentWriter = new RollingFileWriter(null);
+    }
+    currentWriter.add(record);
   }
 
   @Override
-  public void write(InternalRow row) throws IOException {
-    writeInternal(row);
+  public void close() throws IOException {
+    closeCurrent();
+  }
+
+  private void closeCurrent() throws IOException {

Review comment:
       Is this method needed? Why not merge it with `close`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463345937



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> complete() throws IOException {
+    close();
+
+    if (completedFiles.size() > 0) {
+      return ImmutableList.copyOf(completedFiles);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected class RollingFileWriter implements Closeable {
+    private static final int ROWS_DIVISOR = 1000;
+    private final PartitionKey partitionKey;
+
+    private EncryptedOutputFile currentFile = null;
+    private FileAppender<T> currentAppender = null;
+    private long currentRows = 0;
+
+    public RollingFileWriter(PartitionKey partitionKey) {
+      this.partitionKey = partitionKey;
+    }
+
+    public void add(T record) throws IOException {
+      if (currentAppender == null) {
+        openCurrent();
+      }
+
+      this.currentAppender.add(record);
+      this.currentRows++;
+
+      if (shouldRollToNewFile()) {
+        closeCurrent();
+      }
+    }
+
+    private void openCurrent() {
+      if (spec.fields().size() == 0) {

Review comment:
       Unpartitioned writers pass a null partition key. Would it make more sense to use that instead of using `spec`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460586355



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {

Review comment:
       I think this is fine, but you might want to move this into Flink and combine it with the Flink-specific writer. There are a lot of concerns that might need to change for this class, like using a LRU cache for writers, incrementally releasing files, etc. Since this is only used by Flink, we might just want to iterate on it there instead of trying to maintain this as an independent class. We can always bring it back out when we have an additional use case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463346947



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
##########
@@ -226,9 +229,15 @@ public String toString() {
     return String.format("IcebergWrite(table=%s, format=%s)", table, format);
   }
 
-  public static class TaskCommit extends TaskResult implements WriterCommitMessage {
-    TaskCommit(TaskResult result) {
-      super(result.files());
+  public static class TaskCommit implements WriterCommitMessage {
+    private final List<DataFile> taskFiles;

Review comment:
       Same here, this class should use an Array of data files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460634798



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);
+    }
+
+    writer.add(row);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!writers.isEmpty()) {
+      Iterator<RollingFileAppender> iterator = writers.values().iterator();
+      while (iterator.hasNext()) {
+        iterator.next().close();
+        // Remove from the writers after closed.
+        iterator.remove();

Review comment:
       OK, sounds good.

##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> complete() throws IOException {
+    close();
+
+    if (completedFiles.size() > 0) {
+      return ImmutableList.copyOf(completedFiles);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  class RollingFileAppender implements Closeable {

Review comment:
       Well, sounds great.

##########
File path: core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+
+public class UnpartitionedWriter<T> extends BaseTaskWriter<T> {
+
+  private RollingFileAppender currentAppender = null;
+
+  public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                             OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  @Override
+  public void write(T record) throws IOException {
+    if (currentAppender == null) {
+      currentAppender = new RollingFileAppender(null);

Review comment:
       I refactor this part because we don't need to initialize any real writer if there's no record come in.  Before this patch ,   it will open a real file writer even if there's no record to write, and in the end we will need to close this useless writer and clean its file. 

##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -92,16 +96,17 @@ private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
     RowDataReader dataReader = new RowDataReader(
         task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
 
-    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
-        properties, schema, SparkSchemaUtil.convert(schema));
+    StructType structType = SparkSchemaUtil.convert(schema);
+    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType);

Review comment:
       OK, let me revert this.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.flink.data.FlinkAvroWriter;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.parquet.Parquet;
+
+class TaskWriterFactory {
+  private TaskWriterFactory() {
+  }
+
+  static TaskWriter<Row> createTaskWriter(Schema schema,
+                                          PartitionSpec spec,
+                                          FileFormat format,
+                                          FileAppenderFactory<Row> appenderFactory,
+                                          OutputFileFactory fileFactory,
+                                          FileIO io,
+                                          long targetFileSizeBytes) {
+    if (spec.fields().isEmpty()) {
+      return new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io, targetFileSizeBytes);
+    } else {
+      return new RowPartitionedFanoutWriter(spec, format, appenderFactory, fileFactory,
+          io, targetFileSizeBytes, schema);
+    }
+  }
+
+  private static class RowPartitionedFanoutWriter extends PartitionedFanoutWriter<Row> {
+
+    private final PartitionKey partitionKey;
+    private final RowWrapper rowWrapper;
+
+    RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<Row> appenderFactory,
+                               OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema) {
+      super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+      this.partitionKey = new PartitionKey(spec, schema);
+      this.rowWrapper = new RowWrapper(schema.asStruct());
+    }
+
+    @Override
+    protected PartitionKey partition(Row row) {
+      partitionKey.partition(rowWrapper.wrap(row));
+      return partitionKey;
+    }
+  }
+
+  static class FlinkFileAppenderFactory implements FileAppenderFactory<Row> {
+    private final Schema schema;
+    private final Map<String, String> props;
+
+    FlinkFileAppenderFactory(Schema schema, Map<String, String> props) {
+      this.schema = schema;
+      this.props = props;
+    }
+
+    @Override
+    public FileAppender<Row> newAppender(OutputFile outputFile, FileFormat format) {

Review comment:
       I see those RP(s),  I'd prefer to keep the current version so that we could introduce the flink unit test to address this big change.  Changing it to RowData should be easy in future I think.

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -250,33 +253,42 @@ public String toString() {
       if (spec.fields().isEmpty()) {
         return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
       } else {
-        return new Partitioned24Writer(
-            spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema);
+        return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(),
+            targetFileSize, writeSchema, dsSchema);
       }
     }
   }
 
-  private static class Unpartitioned24Writer extends UnpartitionedWriter implements DataWriter<InternalRow> {
+  private static class Unpartitioned24Writer extends UnpartitionedWriter<InternalRow>
+      implements DataWriter<InternalRow> {
     Unpartitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
                           OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
       super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
     }
 
     @Override
     public WriterCommitMessage commit() throws IOException {
-      return new TaskCommit(complete());
+      this.close();
+
+      List<DataFile> dataFiles = complete();
+      return new TaskCommit(new TaskResult(dataFiles));

Review comment:
       OK

##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       > During scan planning, IIUC, an inclusive projection could possibly match a very large number of rows that might fall outside of the predicate range if the RollingFileAppender for this rarely observed predicate at this Task Manager buffers its data for a very long time before writing (say days or even weeks in a longer running streaming query).
   
   You mean the flink streaming reader won't see the buffered data which is still not committed to iceberg table ?  Actually,  that's exactly the expected behavior.  Say we have a data pipeline:
   
   ```
   (flink-streaming-sink-job-A) -> (iceberg table) -> (flink-streaming-reader-job-B). 
   ```
   
   The upstream `flink-streaming-sink-job-A`  will append the records to iceberg table continuously, and commit to the iceberg table if checkpoint happen. we need to guarantee the transaction semantic, so the downstream flink streaming reader could only see the committed iceberg data, the delta data between two contiguous snapshots is the incremental data that the flink streaming reader should consume. 
   
   

##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java
##########
@@ -17,41 +17,44 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.io;
 
 import java.io.IOException;
 import java.util.Set;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class PartitionedWriter extends BaseWriter {
+public abstract class PartitionedWriter<T> extends BaseTaskWriter<T> {
   private static final Logger LOG = LoggerFactory.getLogger(PartitionedWriter.class);
 
-  private final PartitionKey key;
-  private final InternalRowWrapper wrapper;
   private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
 
-  PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                    OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) {
+  private PartitionKey currentKey = null;
+  private RollingFileAppender currentAppender = null;

Review comment:
       Nice finding. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r463346866



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -250,32 +259,39 @@ public String toString() {
       if (spec.fields().isEmpty()) {
         return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
       } else {
-        return new Partitioned24Writer(
-            spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema);
+        return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(),
+            targetFileSize, writeSchema, dsSchema);
       }
     }
   }
 
-  private static class Unpartitioned24Writer extends UnpartitionedWriter implements DataWriter<InternalRow> {
+  private static class Unpartitioned24Writer extends UnpartitionedWriter<InternalRow>
+      implements DataWriter<InternalRow> {
     Unpartitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
                           OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
       super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
     }
 
     @Override
     public WriterCommitMessage commit() throws IOException {
+      this.close();
+
       return new TaskCommit(complete());
     }
   }
 
-  private static class Partitioned24Writer extends PartitionedWriter implements DataWriter<InternalRow> {
+  private static class Partitioned24Writer extends SparkPartitionedWriter implements DataWriter<InternalRow> {
+
     Partitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                               OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, Schema writeSchema) {
-      super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, writeSchema);
+                        OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize,
+                        Schema schema, StructType sparkSchema) {
+      super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, schema, sparkSchema);
     }
 
     @Override
     public WriterCommitMessage commit() throws IOException {
+      this.close();

Review comment:
       No need to use the prefix `this` for close calls, is there?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r458265385



##########
File path: core/src/main/java/org/apache/iceberg/taskio/FileAppenderFactory.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+
+public interface FileAppenderFactory<T> {

Review comment:
       This will need Javadoc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#issuecomment-661365934


   I'm assuming that this one should go in before #1145 and will review this one next. If that's not the case, please let me know!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459153998



##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {
+    if (completedFiles.size() > 0) {
+      List<DataFile> dataFiles = ImmutableList.copyOf(completedFiles);
+      completedFiles.clear();
+      return dataFiles;
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected OutputFileFactory outputFileFactory() {
+    return this.fileFactory;
+  }
+
+  WrappedFileAppender createWrappedFileAppender(PartitionKey partitionKey,

Review comment:
       If there is concern about `openCurrent()` having code specific to sub-classes, then we can change it to accept an `EncryptedOutputFile` when it is called. Then `BaseWriter` wouldn't need to have any logic other than for releasing files that get too large. That would probably require a factory method for `EncryptedOutputFile` since it is called in `writeInternal` though. Probably easier just to leave this as it is for now to make this easier to review and get in.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459197243



##########
File path: core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> pollCompleteFiles() {
+    if (completedFiles.size() > 0) {
+      List<DataFile> dataFiles = ImmutableList.copyOf(completedFiles);
+      completedFiles.clear();
+      return dataFiles;
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  protected OutputFileFactory outputFileFactory() {
+    return this.fileFactory;
+  }
+
+  WrappedFileAppender createWrappedFileAppender(PartitionKey partitionKey,
+                                                Supplier<EncryptedOutputFile> outputFileSupplier) {
+    EncryptedOutputFile outputFile = outputFileSupplier.get();
+    FileAppender<T> appender = appenderFactory.newAppender(outputFile.encryptingOutputFile(), format);
+    return new WrappedFileAppender(partitionKey, outputFile, appender);
+  }
+
+  class WrappedFileAppender {
+    private final PartitionKey partitionKey;
+    private final EncryptedOutputFile encryptedOutputFile;
+    private final FileAppender<T> appender;
+
+    private boolean closed = false;
+    private long currentRows = 0;
+
+    WrappedFileAppender(PartitionKey partitionKey, EncryptedOutputFile encryptedOutputFile, FileAppender<T> appender) {
+      this.partitionKey = partitionKey;
+      this.encryptedOutputFile = encryptedOutputFile;
+      this.appender = appender;
+    }
+
+    void add(T record) {
+      this.appender.add(record);
+      this.currentRows++;
+    }
+
+    boolean shouldRollToNewFile() {
+      //TODO: ORC file now not support target file size before closed

Review comment:
       It could be a separate issue to address this ORC issue you described ?   I think we could focus on the writer refactor. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460585033



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);
+    }
+
+    writer.add(row);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!writers.isEmpty()) {
+      Iterator<RollingFileAppender> iterator = writers.values().iterator();
+      while (iterator.hasNext()) {
+        iterator.next().close();
+        // Remove from the writers after closed.
+        iterator.remove();

Review comment:
       Many iterator classes don't implement `remove`. What about iterating over the key set separately instead?
   
   ```java
     if (!writers.isEmpty()) {
       for (PartitionKey key : writers.keySet()) {
         RollingFileAppender writer = writers.remove(key);
         writer.close();
       }
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460583813



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -250,33 +253,42 @@ public String toString() {
       if (spec.fields().isEmpty()) {
         return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
       } else {
-        return new Partitioned24Writer(
-            spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema);
+        return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(),
+            targetFileSize, writeSchema, dsSchema);
       }
     }
   }
 
-  private static class Unpartitioned24Writer extends UnpartitionedWriter implements DataWriter<InternalRow> {
+  private static class Unpartitioned24Writer extends UnpartitionedWriter<InternalRow>
+      implements DataWriter<InternalRow> {
     Unpartitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
                           OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
       super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
     }
 
     @Override
     public WriterCommitMessage commit() throws IOException {
-      return new TaskCommit(complete());
+      this.close();
+
+      List<DataFile> dataFiles = complete();
+      return new TaskCommit(new TaskResult(dataFiles));

Review comment:
       If `complete` doesn't produce `TaskResult`, then I'm not sure that we need it at all anymore. Could we just construct `TaskCommit` directly?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459154619



##########
File path: core/src/main/java/org/apache/iceberg/taskio/FileAppenderFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.taskio;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+
+/**
+ * Factory to create a new {@link FileAppender} to write records.
+ *
+ * @param <T> data type of the rows to append.
+ */
+public interface FileAppenderFactory<T> {
+
+  /**
+   * Create a new {@link FileAppender}.
+   *
+   * @param outputFile indicate the file location to write.

Review comment:
       Minor: I think the Javadoc for arguments should describe the argument's purpose, like `an OutputFile used to create an output stream`. If the purpose is clear from the expected type, then keeping it simple is fine, like `an OutputFile`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1213: Abstract the generic task writers for sharing the common codes between spark and flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r459165341



##########
File path: core/src/main/java/org/apache/iceberg/taskio/PartitionedWriter.java
##########
@@ -17,41 +17,40 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.taskio;
 
 import java.io.IOException;
 import java.util.Set;
+import java.util.function.Function;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class PartitionedWriter extends BaseWriter {
+public class PartitionedWriter<T> extends BaseTaskWriter<T> {
   private static final Logger LOG = LoggerFactory.getLogger(PartitionedWriter.class);
 
-  private final PartitionKey key;
-  private final InternalRowWrapper wrapper;
+  private final Function<T, PartitionKey> keyGetter;
   private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
 
-  PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                    OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) {
+  private PartitionKey currentKey = null;
+  private WrappedFileAppender currentAppender = null;
+
+  public PartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long targetFileSize,
+                           Function<T, PartitionKey> keyGetter) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
-    this.key = new PartitionKey(spec, writeSchema);
-    this.wrapper = new InternalRowWrapper(SparkSchemaUtil.convert(writeSchema));
+    this.keyGetter = keyGetter;

Review comment:
       Like the other partitioned writer, I think this should use an abstract method to be implemented by subclasses.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org