You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/20 16:42:16 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #3158: Core: Add new rolling file writers

aokolnychyi opened a new pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158


   This PR adds new rolling writers and contains a subset of changes in PR #2945.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#issuecomment-923202470


   Looks good to me. I had a couple comments, but nothing blocking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#issuecomment-923100808


   cc @openinx @stevenzwu @RussellSpitzer @rdblue @kbendick @karuppayya @flyrain @pvary @jackye1995 @yyanyy @szehon-ho @rymurr @jun-he


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712491746



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition

Review comment:
       Fixed.

##########
File path: core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
+  private static final int ROWS_DIVISOR = 1000;
+
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSizeInBytes;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+
+  private EncryptedOutputFile currentFile = null;
+  private long currentFileRows = 0;
+  private W currentWriter = null;
+
+  private boolean closed = false;
+
+  protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
+                              PartitionSpec spec, StructLike partition) {
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.spec = spec;
+    this.partition = partition;
+  }
+
+  protected abstract W newWriter(EncryptedOutputFile file);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  protected PartitionSpec spec() {
+    return spec;
+  }
+
+  protected StructLike partition() {
+    return partition;
+  }
+
+  public CharSequence currentFilePath() {
+    return currentFile.encryptingOutputFile().location();
+  }
+
+  public long currentFileRows() {
+    return currentFileRows;
+  }
+
+  @Override
+  public long length() {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length");
+  }
+
+  @Override
+  public void write(T row) throws IOException {
+    currentWriter.write(row);
+    currentFileRows++;
+
+    if (shouldRollToNewFile()) {
+      closeCurrentWriter();
+      initCurrentWriter();
+    }
+  }
+
+  private boolean shouldRollToNewFile() {
+    return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes;
+  }
+
+  protected void initCurrentWriter() {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712341773



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A rolling equality delete writer that splits incoming deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+public class RollingEqualityDeleteWriter<T> extends RollingFileWriter<T, EqualityDeleteWriter<T>, DeleteWriteResult> {

Review comment:
       Delete writers are in `org.apache.iceberg.deletes` while all other writers are in `org.apache.iceberg.io`.
   I think it makes sense to have writer-related classes in the `io` package so I added rolling writers there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712438004



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
+  private static final int ROWS_DIVISOR = 1000;
+
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSizeInBytes;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+
+  private EncryptedOutputFile currentFile = null;
+  private long currentFileRows = 0;
+  private W currentWriter = null;
+
+  private boolean closed = false;
+
+  protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
+                              PartitionSpec spec, StructLike partition) {
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.spec = spec;
+    this.partition = partition;
+  }
+
+  protected abstract W newWriter(EncryptedOutputFile file);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  protected PartitionSpec spec() {
+    return spec;
+  }
+
+  protected StructLike partition() {
+    return partition;
+  }
+
+  public CharSequence currentFilePath() {
+    return currentFile.encryptingOutputFile().location();
+  }
+
+  public long currentFileRows() {
+    return currentFileRows;
+  }
+
+  @Override
+  public long length() {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length");
+  }
+
+  @Override
+  public void write(T row) throws IOException {
+    currentWriter.write(row);
+    currentFileRows++;
+
+    if (shouldRollToNewFile()) {
+      closeCurrentWriter();
+      initCurrentWriter();
+    }
+  }
+
+  private boolean shouldRollToNewFile() {
+    return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes;
+  }
+
+  protected void initCurrentWriter() {

Review comment:
       Why change this to `init` from `open`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712435910



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition

Review comment:
       Nit: data/deletes isn't very clear. It would be better to say data or deletes, since they can't be mixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#issuecomment-923374158


   Thanks for reviewing, @rdblue!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712490516



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
+  private static final int ROWS_DIVISOR = 1000;
+
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSizeInBytes;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+
+  private EncryptedOutputFile currentFile = null;
+  private long currentFileRows = 0;
+  private W currentWriter = null;
+
+  private boolean closed = false;
+
+  protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
+                              PartitionSpec spec, StructLike partition) {
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.spec = spec;
+    this.partition = partition;
+  }
+
+  protected abstract W newWriter(EncryptedOutputFile file);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  protected PartitionSpec spec() {
+    return spec;
+  }
+
+  protected StructLike partition() {
+    return partition;
+  }
+
+  public CharSequence currentFilePath() {

Review comment:
       I mean all classes that extend `RollingFileWriter` init the writer immediately so we shouldn't worry about the current file being null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712437430



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
+  private static final int ROWS_DIVISOR = 1000;
+
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSizeInBytes;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+
+  private EncryptedOutputFile currentFile = null;
+  private long currentFileRows = 0;
+  private W currentWriter = null;
+
+  private boolean closed = false;
+
+  protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
+                              PartitionSpec spec, StructLike partition) {
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.spec = spec;
+    this.partition = partition;
+  }
+
+  protected abstract W newWriter(EncryptedOutputFile file);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  protected PartitionSpec spec() {
+    return spec;
+  }
+
+  protected StructLike partition() {
+    return partition;
+  }
+
+  public CharSequence currentFilePath() {

Review comment:
       Sounds good to me. You mean the CDC writer constructor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712427447



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
+  private static final int ROWS_DIVISOR = 1000;
+
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSizeInBytes;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+
+  private EncryptedOutputFile currentFile = null;
+  private long currentFileRows = 0;
+  private W currentWriter = null;
+
+  private boolean closed = false;
+
+  protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
+                              PartitionSpec spec, StructLike partition) {
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.spec = spec;
+    this.partition = partition;
+  }
+
+  protected abstract W newWriter(EncryptedOutputFile file);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  protected PartitionSpec spec() {
+    return spec;
+  }
+
+  protected StructLike partition() {
+    return partition;
+  }
+
+  public CharSequence currentFilePath() {

Review comment:
       I removed the precondition here that the current file is not null. We will call this method for every single row while writing CDC records. Right now, `currentFile` is never null as we init it in the constructor. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712341773



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A rolling equality delete writer that splits incoming deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+public class RollingEqualityDeleteWriter<T> extends RollingFileWriter<T, EqualityDeleteWriter<T>, DeleteWriteResult> {

Review comment:
       Simple delete writers are in `org.apache.iceberg.deletes` while all other writers are in `org.apache.iceberg.io`.
   I think it makes sense to have writer-related classes in the `io` package so I added rolling writers there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712428064



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
+  private static final int ROWS_DIVISOR = 1000;
+
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSizeInBytes;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+
+  private EncryptedOutputFile currentFile = null;
+  private long currentFileRows = 0;
+  private W currentWriter = null;
+
+  private boolean closed = false;
+
+  protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
+                              PartitionSpec spec, StructLike partition) {
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.spec = spec;
+    this.partition = partition;
+  }
+
+  protected abstract W newWriter(EncryptedOutputFile file);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  protected PartitionSpec spec() {
+    return spec;
+  }
+
+  protected StructLike partition() {
+    return partition;
+  }
+
+  public CharSequence currentFilePath() {
+    return currentFile.encryptingOutputFile().location();
+  }
+
+  public long currentFileRows() {
+    return currentFileRows;
+  }
+
+  @Override
+  public long length() {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length");
+  }
+
+  @Override
+  public void write(T row) throws IOException {
+    currentWriter.write(row);
+    currentFileRows++;
+
+    if (shouldRollToNewFile()) {
+      closeCurrentWriter();
+      initCurrentWriter();
+    }
+  }
+
+  private boolean shouldRollToNewFile() {
+    return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes;
+  }
+
+  protected void initCurrentWriter() {
+    Preconditions.checkState(currentWriter == null, "Current writer has been already initialized");
+
+    this.currentFile = newFile();
+    this.currentFileRows = 0;
+    this.currentWriter = newWriter(currentFile);
+  }
+
+  private EncryptedOutputFile newFile() {
+    if (partition == null) {
+      return fileFactory.newOutputFile();
+    } else {
+      return fileFactory.newOutputFile(spec, partition);

Review comment:
       @rdblue, I've updated this place to pass `spec` like we discussed in the original PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3158: Core: Add new rolling file writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3158:
URL: https://github.com/apache/iceberg/pull/3158#discussion_r712492715



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
+  private static final int ROWS_DIVISOR = 1000;
+
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSizeInBytes;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+
+  private EncryptedOutputFile currentFile = null;
+  private long currentFileRows = 0;
+  private W currentWriter = null;
+
+  private boolean closed = false;
+
+  protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
+                              PartitionSpec spec, StructLike partition) {
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.spec = spec;
+    this.partition = partition;
+  }
+
+  protected abstract W newWriter(EncryptedOutputFile file);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  protected PartitionSpec spec() {
+    return spec;
+  }
+
+  protected StructLike partition() {
+    return partition;
+  }
+
+  public CharSequence currentFilePath() {

Review comment:
       Yeah, I saw that. I don't have a good way around this, but at least we know that it will fail quickly without that init call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org