You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/26 14:15:24 UTC

[GitHub] [iceberg] openinx opened a new pull request #1663: Flink: write the CDC records into apache iceberg tables

openinx opened a new pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663


   This patch will address the issue https://github.com/apache/iceberg/issues/1639.  It's still working in-progress now (PoC). 
   
   Currently, I finished few abstraction work: 
   
   1. Abstract the BaseTaskWriter so that it could accept both INSERT and EQUALITY-DELETE records.
   2. Abstracted a `ContentFileWriter` , so that we DataFile , EqualityDeleteFile, PosDeleteFile could implement this interface and provide a unified `write` entry. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r526882235



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);
+    }
+
+    if (posDeleteWriter != null && equalityDeleteWriter == null) {
+      // Only accept INSERT records and position deletion.
+      Preconditions.checkArgument(tableSchema == null);
+      Preconditions.checkArgument(equalityFieldIds == null);
+    }
+
+    if (equalityDeleteWriter != null) {
+      // Accept insert records, position deletion, equality deletions.
+      Preconditions.checkNotNull(posDeleteWriter,
+          "Position delete writer shouldn't be null when writing equality deletions.");
+      Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null");
+      Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null");
+      Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null");
+
+      Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds));
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.structLikeFun = structLikeFun;
+    } else {
+      this.insertedRowMap = null;
+      this.structLikeFun = null;
+    }
+
+    this.dataWriter = dataWriter;
+    this.equalityDeleteWriter = equalityDeleteWriter;
+    this.posDeleteWriter = posDeleteWriter;
+  }
+
+  @Override
+  public void writeRow(T row) throws IOException {
+    if (enableEqualityDelete()) {
+      FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos());
+      insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> {
+        if (v == null) {
+          return Lists.newArrayList(filePos);
+        } else {
+          v.add(filePos);
+          return v;
+        }
+      });
+    }
+
+    dataWriter.write(row);
+  }
+
+  @Override
+  public void writeEqualityDelete(T equalityDelete) throws IOException {
+    if (!enableEqualityDelete()) {
+      throw new UnsupportedOperationException("Could not accept equality deletion.");
+    }
+
+    List<FilePos> existing = insertedRowMap.get(structLikeFun.apply(equalityDelete));
+
+    if (existing == null) {
+      // Delete the row which have been written by other completed delta writer.
+      equalityDeleteWriter.write(equalityDelete);
+    } else {
+      // Delete the rows which was written in current delta writer.
+      for (FilePos filePos : existing) {
+        posDeleteWriter.write(positionDelete.set(filePos.path(), filePos.pos(), null));
+      }
+    }
+  }
+
+  @Override
+  public void writePosDelete(CharSequence path, long offset, T row) throws IOException {
+    if (!enablePosDelete()) {
+      throw new UnsupportedOperationException("Could not accept position deletion.");
+    }
+
+    posDeleteWriter.write(positionDelete.set(path, offset, row));
+  }
+
+  @Override
+  public void abort() {
+    if (dataWriter != null) {
+      try {
+        dataWriter.abort();
+      } catch (IOException e) {
+        LOG.warn("Failed to abort the data writer {} because: ", dataWriter, e);

Review comment:
       I'm still thinking how to handle this gracefully.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523327731



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);
+    }
+
+    if (posDeleteWriter != null && equalityDeleteWriter == null) {
+      // Only accept INSERT records and position deletion.
+      Preconditions.checkArgument(tableSchema == null);
+      Preconditions.checkArgument(equalityFieldIds == null);
+    }
+
+    if (equalityDeleteWriter != null) {
+      // Accept insert records, position deletion, equality deletions.
+      Preconditions.checkNotNull(posDeleteWriter,
+          "Position delete writer shouldn't be null when writing equality deletions.");
+      Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null");
+      Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null");
+      Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null");
+
+      Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds));
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.structLikeFun = structLikeFun;
+    } else {
+      this.insertedRowMap = null;
+      this.structLikeFun = null;
+    }
+
+    this.dataWriter = dataWriter;
+    this.equalityDeleteWriter = equalityDeleteWriter;
+    this.posDeleteWriter = posDeleteWriter;
+  }
+
+  @Override
+  public void writeRow(T row) throws IOException {
+    if (enableEqualityDelete()) {
+      FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos());
+      insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> {
+        if (v == null) {
+          return Lists.newArrayList(filePos);
+        } else {
+          v.add(filePos);
+          return v;
+        }
+      });
+    }
+
+    dataWriter.write(row);
+  }
+
+  @Override
+  public void writeEqualityDelete(T equalityDelete) throws IOException {
+    if (!enableEqualityDelete()) {
+      throw new UnsupportedOperationException("Could not accept equality deletion.");
+    }
+
+    List<FilePos> existing = insertedRowMap.get(structLikeFun.apply(equalityDelete));
+
+    if (existing == null) {
+      // Delete the row which have been written by other completed delta writer.
+      equalityDeleteWriter.write(equalityDelete);
+    } else {
+      // Delete the rows which was written in current delta writer.
+      for (FilePos filePos : existing) {
+        posDeleteWriter.write(positionDelete.set(filePos.path(), filePos.pos(), null));
+      }
+    }
+  }
+
+  @Override
+  public void writePosDelete(CharSequence path, long offset, T row) throws IOException {
+    if (!enablePosDelete()) {
+      throw new UnsupportedOperationException("Could not accept position deletion.");
+    }
+
+    posDeleteWriter.write(positionDelete.set(path, offset, row));
+  }
+
+  @Override
+  public void abort() {
+    if (dataWriter != null) {
+      try {
+        dataWriter.abort();
+      } catch (IOException e) {
+        LOG.warn("Failed to abort the data writer {} because: ", dataWriter, e);

Review comment:
       I think abort should accept an optional exception, so that the suppressed exceptions can be added to it rather than just logged.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523301692



##########
File path: core/src/main/java/org/apache/iceberg/deletes/DeletesUtil.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+
+public class DeletesUtil {
+
+  private DeletesUtil() {
+  }
+
+  public static Schema posDeleteSchema(Schema rowSchema) {
+    if (rowSchema == null) {
+      return new Schema(
+          MetadataColumns.DELETE_FILE_PATH,
+          MetadataColumns.DELETE_FILE_POS);
+    } else {
+      // the appender uses the row schema wrapped with position fields
+      return new Schema(
+          MetadataColumns.DELETE_FILE_PATH,
+          MetadataColumns.DELETE_FILE_POS,
+          Types.NestedField.optional(

Review comment:
       This should be required, not optional. If the row is included, it must always be included to avoid stats that do not match.
   
   Looks like this was a problem before as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#issuecomment-718060632


   Awesome, thanks for working on this, @openinx! I'll take a look as soon as I get some time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r528287157



##########
File path: core/src/main/java/org/apache/iceberg/ContentFileWriter.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+public interface ContentFileWriter<T, R> extends Closeable {

Review comment:
       I read the PR,  here is my feeling: 
   
   First of all, I like the idea to use a single `FileAppenderFactory` to customize different writers for different computing engines,  it's unified and graceful.  Developers would find it's easy to understand and customize.
   
   Second, I agreed that it's necessary to consider the sort order for position delete writer. We have to sort the pairs in memory (it's similar to the process about flushing the sorted memstore to HFiles in HBase.), once our memory size reached the threshold then would flush it to pos-delete file,  then the rolling policy is decided by the memory size rather than the current file size.  Sounds reasonable to make it to be a separate pos-writer. 
   
   Third,  I'd like to finish the whole cdc write path work (PoC) based on #1802 to see whether there're other issues.
   
   Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523326160



##########
File path: core/src/main/java/org/apache/iceberg/io/DeltaWriter.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * The write interface could accept INSERT, POS-DELETION, EQUALITY-DELETION. It usually write those operations
+ * in a given partition or bucket.
+ */
+public interface DeltaWriter<T> extends Closeable {
+
+  /**
+   * Write the insert record.
+   */
+  void writeRow(T row) throws IOException;

Review comment:
       I don't think Iceberg classes should throw `IOException` like this. The `FileAppender` interface only throws `IOException` through `Closeable`, which is inherited.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523319683



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);
+    }
+
+    if (posDeleteWriter != null && equalityDeleteWriter == null) {
+      // Only accept INSERT records and position deletion.
+      Preconditions.checkArgument(tableSchema == null);
+      Preconditions.checkArgument(equalityFieldIds == null);
+    }
+
+    if (equalityDeleteWriter != null) {
+      // Accept insert records, position deletion, equality deletions.
+      Preconditions.checkNotNull(posDeleteWriter,
+          "Position delete writer shouldn't be null when writing equality deletions.");
+      Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null");
+      Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null");
+      Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null");
+
+      Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds));
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.structLikeFun = structLikeFun;
+    } else {
+      this.insertedRowMap = null;
+      this.structLikeFun = null;
+    }
+
+    this.dataWriter = dataWriter;
+    this.equalityDeleteWriter = equalityDeleteWriter;
+    this.posDeleteWriter = posDeleteWriter;
+  }
+
+  @Override
+  public void writeRow(T row) throws IOException {
+    if (enableEqualityDelete()) {
+      FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos());
+      insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> {
+        if (v == null) {
+          return Lists.newArrayList(filePos);
+        } else {
+          v.add(filePos);
+          return v;

Review comment:
       Do we expect multiple inserts for the same key? I would expect that we do not.
   
   If that's the case, then the suggestion below to use `StructLikeMap<FilePos>` could simplify this logic. When a row is deleted by position (added to `deletePositions`), it could be removed from the map. Then this logic could be:
   
   ```java
     StructLike key = structLikeFun.apply(row);
     FilePos previous = insertedRowMap.put(key, filePos);
     ValidationException.check(previous == null, "Detected duplicate insert for %s", key);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523325036



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingContentFileWriter.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentFileWriter;
+import org.apache.iceberg.ContentFileWriterFactory;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.util.Tasks;
+
+public class RollingContentFileWriter<ContentFileT, T> implements Closeable {
+
+  private static final int ROWS_DIVISOR = 1000;
+  private final PartitionKey partitionKey;
+  private final FileFormat format;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+  private final ContentFileWriterFactory<ContentFileT, T> writerFactory;
+
+  private final WriterResult.Builder resultBuilder;
+
+  private EncryptedOutputFile currentFile = null;
+  private ContentFileWriter<ContentFileT, T> currentFileWriter = null;
+  private long currentRows = 0;
+
+  public RollingContentFileWriter(PartitionKey partitionKey, FileFormat format,
+                                  OutputFileFactory fileFactory, FileIO io,
+                                  long targetFileSize, ContentFileWriterFactory<ContentFileT, T> writerFactory) {
+    this.partitionKey = partitionKey;
+    this.format = format;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+    this.writerFactory = writerFactory;
+
+    this.resultBuilder = WriterResult.builder();
+
+    openCurrent();
+  }
+
+  public CharSequence currentPath() {
+    return currentFile.encryptingOutputFile().location();
+  }
+
+  public long currentPos() {
+    return currentRows;
+  }
+
+  public void write(T record) throws IOException {
+    this.currentFileWriter.write(record);
+    this.currentRows++;
+
+    if (shouldRollToNewFile()) {
+      closeCurrent();
+      openCurrent();
+    }
+  }
+
+  public void abort() throws IOException {
+    close();
+
+    WriterResult result = resultBuilder.build();
+
+    Tasks.foreach(result.contentFiles())
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  public WriterResult complete() throws IOException {
+    close();
+
+    return resultBuilder.build();
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("partitionKey", partitionKey)
+        .add("format", format)
+        .toString();
+  }
+
+  private void openCurrent() {
+    if (partitionKey == null) {
+      // unpartitioned
+      currentFile = fileFactory.newOutputFile();
+    } else {
+      // partitioned
+      currentFile = fileFactory.newOutputFile(partitionKey);
+    }
+    currentFileWriter = writerFactory.createWriter(partitionKey, currentFile, format);
+    currentRows = 0;
+  }
+
+  private boolean shouldRollToNewFile() {
+    // TODO: ORC file now not support target file size before closed
+    return !format.equals(FileFormat.ORC) &&
+        currentRows % ROWS_DIVISOR == 0 && currentFileWriter.length() >= targetFileSize;
+  }
+
+  private void closeCurrent() throws IOException {
+    if (currentFileWriter != null) {
+      currentFileWriter.close();
+
+      ContentFileT contentFile = currentFileWriter.toContentFile();
+      Metrics metrics = currentFileWriter.metrics();

Review comment:
       This looks like the only use of `ContentFileWriter#metrics()`. I think it would be better to simplify this and just expose `rowCount()` instead of all metrics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r527022154



##########
File path: core/src/main/java/org/apache/iceberg/io/DeltaWriterFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.ContentFileWriterFactory;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+/**
+ * Factory to create {@link DeltaWriter}, which have few dependencies factories to create different kinds of writers.
+ */
+public interface DeltaWriterFactory<T> {
+
+  /**
+   * Create a factory to initialize the {@link DeltaWriter}.
+   */
+  DeltaWriter<T> createDeltaWriter(PartitionKey partitionKey, Context context);
+
+  /**
+   * Create a factory to initialize the {@link FileAppender}.
+   */
+  FileAppenderFactory<T> createFileAppenderFactory();

Review comment:
       Well,  we should only have one `FileAppenderFactory` for one `TaskWriter`,  each partition `DeltaWriter` could share the `FileAppenderFactory`.  It's better to move it out of this factory.
   
   The following `createDataFileWriterFactory`,  `createEqualityDeleteWriterFactory` have the similar issues. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r526813412



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);
+    }
+
+    if (posDeleteWriter != null && equalityDeleteWriter == null) {
+      // Only accept INSERT records and position deletion.
+      Preconditions.checkArgument(tableSchema == null);
+      Preconditions.checkArgument(equalityFieldIds == null);
+    }
+
+    if (equalityDeleteWriter != null) {
+      // Accept insert records, position deletion, equality deletions.
+      Preconditions.checkNotNull(posDeleteWriter,
+          "Position delete writer shouldn't be null when writing equality deletions.");
+      Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null");
+      Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null");
+      Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null");
+
+      Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds));
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.structLikeFun = structLikeFun;
+    } else {
+      this.insertedRowMap = null;
+      this.structLikeFun = null;
+    }
+
+    this.dataWriter = dataWriter;
+    this.equalityDeleteWriter = equalityDeleteWriter;
+    this.posDeleteWriter = posDeleteWriter;
+  }
+
+  @Override
+  public void writeRow(T row) throws IOException {
+    if (enableEqualityDelete()) {
+      FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos());
+      insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> {
+        if (v == null) {
+          return Lists.newArrayList(filePos);
+        } else {
+          v.add(filePos);
+          return v;

Review comment:
       Oh, there's another issue. Consider the two cases: 
   
   ```
   Case.1 : Execute two transactions: 
   
   txn1:  INSERT(1) ; DELETE(1); INSERT(1);
   txn2: INSERT(1) ; 
   ```
   
   It won't throw any duplicate key exception because we don't check the key existences cross transactions. 
   
   ```
   Case.2:  Execute only one transaction 
   
   txn1:    INSERT(1) ; INSERT(1); 
   ```
   
   we will encounter a `ValidationException` when executing the second `INSERT(1)`. 
   
   Will it confuse users because of the different behavior ? 
   
   For me,   iceberg won't check the key existence when inserting so in theory we don't have to guarantee that iceberg must throw a duplicated key exception if users really insert two duplicated records. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523304596



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);

Review comment:
       Preconditions should have error strings.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523302977



##########
File path: core/src/test/java/org/apache/iceberg/TestOverwrite.java
##########
@@ -94,7 +94,7 @@
 
   @Parameterized.Parameters(name = "formatVersion = {0}")
   public static Object[] parameters() {
-    return new Object[] { 1, 2 };
+    return new Object[] {1, 2};

Review comment:
       Looks like this file doesn't need to change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r526657886



##########
File path: core/src/main/java/org/apache/iceberg/ContentFileWriter.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+public interface ContentFileWriter<T, R> extends Closeable {

Review comment:
       In my throught, the whole write workflow should be in the following: 
   
   ```
   
                                                  TaskWriter
                                                      |
                                                      |
                                                      |
                             --------------------------------------------------
                             |                                                |
                             |                                                |
                             V                                                V
                        DeltaWriter                                     DeltaWriter
                       (Partition-1)                                    (Partition-2)
                             |                                  
                             |
       ------------------------------------------------
      |                      |                        |
      |                      |                        |
      V                      V                        V
   RollingFileWriter    RollingFileWriter      RollingFileWriter
    (Pos-Delete)          (Insert)              (Equality-Delete)
                              |
                              |
                              V
                 -----------------------------
                 |             |             |
                 |             |             |
                 |             |             |
                 V             V             V
            FileAppender    FileAppender    ...
             (closed)        (Openning)
               
   ```
   
   For each executor/task in compute engine, it have a TaskWriter to write 
   generic record. If it use the fanout policy to write records then it will
   have multiple DeltaWriters and each one will write records for a single
   partition, while if use the grouped polciy in spark then we might just have one
   DeltaWriter in TaskWriter. The DeltaWriter could accept both INSERT/EQ-DELETE/POS-DELETE
   records, each kind of record we will have a RollingFileWriter which will roll its file appender
   to a newly opened file appender once its size reach the threshold.
   
   In the RollingFileWriter, we should have the same logic. So in theory it's good to define an abstracted ContentFileWriter so that we don't have to define three kinds of RollingFileWriter.
   Another way is to define a BaseRollingFileWriter and put the common logic there, then the DeltaWriter would use the BaseRollingFileWriter. when constructing the DeltaWriter, we would need to pass those subclasses PosDeleteRollingFileWriter, EqDeleteRollingFileWriter, DataRollingFileWriter to it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r528029018



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);
+    }
+
+    if (posDeleteWriter != null && equalityDeleteWriter == null) {
+      // Only accept INSERT records and position deletion.
+      Preconditions.checkArgument(tableSchema == null);
+      Preconditions.checkArgument(equalityFieldIds == null);
+    }
+
+    if (equalityDeleteWriter != null) {
+      // Accept insert records, position deletion, equality deletions.
+      Preconditions.checkNotNull(posDeleteWriter,
+          "Position delete writer shouldn't be null when writing equality deletions.");
+      Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null");
+      Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null");
+      Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null");
+
+      Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds));
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.structLikeFun = structLikeFun;
+    } else {
+      this.insertedRowMap = null;
+      this.structLikeFun = null;
+    }
+
+    this.dataWriter = dataWriter;
+    this.equalityDeleteWriter = equalityDeleteWriter;
+    this.posDeleteWriter = posDeleteWriter;
+  }
+
+  @Override
+  public void writeRow(T row) throws IOException {
+    if (enableEqualityDelete()) {
+      FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos());
+      insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> {
+        if (v == null) {
+          return Lists.newArrayList(filePos);
+        } else {
+          v.add(filePos);
+          return v;

Review comment:
       Also, now that I'm thinking about it, we should _never_ throw an exception for this data because that would break processing. Instead, we should probably send the record to some callback (default no-op), log a warning, and either delete the previous copy or ignore the duplication.
   
   The next question is: if we do expect duplicate inserts, then what is the right behavior? Should we make the second insert replace the first? Or just ignore the duplication?
   
   I'm leaning toward adding a delete to replace the record, but that would only work if the two inserts were in the same checkpoint. If they arrive a few minutes apart, then the data would be duplicated in the table. But, since we consider the records identical by the insert key, I think it is correct to add a position delete for the first record.
   
   If we choose to ignore the duplication, we then need to keep track of both insert positions in case we get INSERT(1), INSERT(1), DELETE(1). The delete would need to drop both rows, not just the second. That's why I would say that a duplicate insert replaces the previously inserted row. That way we can keep track of just one FilePos per key and lower the complexity of tracking this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523303875



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingContentFileWriter.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentFileWriter;
+import org.apache.iceberg.ContentFileWriterFactory;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.util.Tasks;
+
+public class RollingContentFileWriter<ContentFileT, T> implements Closeable {
+
+  private static final int ROWS_DIVISOR = 1000;
+  private final PartitionKey partitionKey;
+  private final FileFormat format;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+  private final ContentFileWriterFactory<ContentFileT, T> writerFactory;
+
+  private final WriterResult.Builder resultBuilder;
+
+  private EncryptedOutputFile currentFile = null;
+  private ContentFileWriter<ContentFileT, T> currentFileWriter = null;
+  private long currentRows = 0;
+
+  public RollingContentFileWriter(PartitionKey partitionKey, FileFormat format,
+                                  OutputFileFactory fileFactory, FileIO io,
+                                  long targetFileSize, ContentFileWriterFactory<ContentFileT, T> writerFactory) {
+    this.partitionKey = partitionKey;
+    this.format = format;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+    this.writerFactory = writerFactory;
+
+    this.resultBuilder = WriterResult.builder();
+
+    openCurrent();
+  }
+
+  public CharSequence currentPath() {
+    return currentFile.encryptingOutputFile().location();
+  }
+
+  public long currentPos() {

Review comment:
       `pos` is typically used to indicate position in bytes rather than rows. How about changing this to something like `rowCount` instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r528050744



##########
File path: core/src/main/java/org/apache/iceberg/ContentFileWriter.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+public interface ContentFileWriter<T, R> extends Closeable {

Review comment:
       I don't think we would require a different class for each engine. The file writers are currently created using an `AppenderFactory` and we could continue using that.
   
   Also, we would not need 3 rolling writers that are nearly identical. We would only need 2 because the position delete writer will be substantially different because of its sort order requirement.
   
   Because deletes may come in any order relative to inserts and we need to write out a sorted delete file, we will need to buffer the deletes in memory. That's not as expensive as it seems at first because the file location will be reused (multiple deletes in the same data file) and so the main cost is the number of positions that get deleted, which is the number of rows written and deleted in the same checkpoint per partition. The position delete writer and logic to roll over to a new file is probably not going to be shared. I don't think I would even build a rolling position delete writer unless we see real cases where it is needed.
   
   That leaves just the equality delete writer and the data file writer. I think it would be cleaner to just have two rolling file writers because the rolling logic is so small. I went ahead and started a PR to show what it would look like: https://github.com/apache/iceberg/pull/1802




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r526675745



##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -300,23 +299,12 @@ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
 
       meta("delete-type", "position");
 
-      if (rowSchema != null && createWriterFunc != null) {
-        // the appender uses the row schema wrapped with position fields
-        appenderBuilder.schema(new org.apache.iceberg.Schema(
-            MetadataColumns.DELETE_FILE_PATH,
-            MetadataColumns.DELETE_FILE_POS,
-            NestedField.optional(
-                MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
-                MetadataColumns.DELETE_FILE_ROW_DOC)));
+      appenderBuilder.schema(DeletesUtil.posDeleteSchema(rowSchema));

Review comment:
       Agree,  the current version is incorrect. I will revert it. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523337318



##########
File path: core/src/main/java/org/apache/iceberg/ContentFileWriter.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+public interface ContentFileWriter<T, R> extends Closeable {

Review comment:
       I'm a little hesitant to add this. I think it is needed so that the same `RollingContentFileWriter` can be used for delete files and data files, but this introduces a lot of changes and new interfaces just to share about 20 lines of code. I'm not sure that it is worth the extra complexity, when compared to having one for data files and one for delete files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523316253



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);
+    }
+
+    if (posDeleteWriter != null && equalityDeleteWriter == null) {
+      // Only accept INSERT records and position deletion.
+      Preconditions.checkArgument(tableSchema == null);
+      Preconditions.checkArgument(equalityFieldIds == null);
+    }
+
+    if (equalityDeleteWriter != null) {
+      // Accept insert records, position deletion, equality deletions.
+      Preconditions.checkNotNull(posDeleteWriter,
+          "Position delete writer shouldn't be null when writing equality deletions.");
+      Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null");
+      Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null");
+      Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null");
+
+      Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds));
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.structLikeFun = structLikeFun;
+    } else {
+      this.insertedRowMap = null;
+      this.structLikeFun = null;
+    }
+
+    this.dataWriter = dataWriter;
+    this.equalityDeleteWriter = equalityDeleteWriter;
+    this.posDeleteWriter = posDeleteWriter;
+  }
+
+  @Override
+  public void writeRow(T row) throws IOException {
+    if (enableEqualityDelete()) {
+      FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos());
+      insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> {
+        if (v == null) {
+          return Lists.newArrayList(filePos);
+        } else {
+          v.add(filePos);
+          return v;
+        }
+      });
+    }
+
+    dataWriter.write(row);
+  }
+
+  @Override
+  public void writeEqualityDelete(T equalityDelete) throws IOException {
+    if (!enableEqualityDelete()) {
+      throw new UnsupportedOperationException("Could not accept equality deletion.");
+    }
+
+    List<FilePos> existing = insertedRowMap.get(structLikeFun.apply(equalityDelete));
+
+    if (existing == null) {
+      // Delete the row which have been written by other completed delta writer.
+      equalityDeleteWriter.write(equalityDelete);
+    } else {
+      // Delete the rows which was written in current delta writer.
+      for (FilePos filePos : existing) {
+        posDeleteWriter.write(positionDelete.set(filePos.path(), filePos.pos(), null));

Review comment:
       I think this strategy will violate the ordering requirements of position delete files.
   
   Position delete files need to be ordered by file and position. Imagine 3 replacements for the same row key. The first creates insert 0, the second insert 1, and the third insert 2. The first adds 0 existing, the second adds 1, and the third adds 2. And the first time through this loop deletes position 0, the second deletes positions 0 and 1, and the third deletes positions 0, 1, and 2. The result is deleting positions 0, 0, 1, 0, 1, 2, which violates the order requirement.
   
   For a given inserted position, we need to track that it is deleted and write all of the deletes at one time so that they are all in order.
   
   I think this should change from using `StructLikeMap<List<FilePos>>` to `StructLikeMap<FilePos>`. Each time the row is deleted, get the current `FilePos` and add it to a `deletePositions` list. Then set the new row's `FilePos` in the map so that it is the one deleted the next time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523303065



##########
File path: core/src/main/java/org/apache/iceberg/util/StructLikeSet.java
##########
@@ -29,7 +30,7 @@
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 
-public class StructLikeSet implements Set<StructLike> {
+public class StructLikeSet extends AbstractCollection<StructLike> implements Set<StructLike> {

Review comment:
       These don't look related. Can you open a separate PR for this fix?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx closed pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx closed pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#issuecomment-726956666


   Yes, will do. I meant to get to it yesterday, but I ran out of time. Thanks for your patience.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r526840377



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);
+    }
+
+    if (posDeleteWriter != null && equalityDeleteWriter == null) {
+      // Only accept INSERT records and position deletion.
+      Preconditions.checkArgument(tableSchema == null);
+      Preconditions.checkArgument(equalityFieldIds == null);
+    }
+
+    if (equalityDeleteWriter != null) {
+      // Accept insert records, position deletion, equality deletions.
+      Preconditions.checkNotNull(posDeleteWriter,
+          "Position delete writer shouldn't be null when writing equality deletions.");
+      Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null");
+      Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null");
+      Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null");
+
+      Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds));
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.structLikeFun = structLikeFun;
+    } else {
+      this.insertedRowMap = null;
+      this.structLikeFun = null;
+    }
+
+    this.dataWriter = dataWriter;
+    this.equalityDeleteWriter = equalityDeleteWriter;
+    this.posDeleteWriter = posDeleteWriter;
+  }
+
+  @Override
+  public void writeRow(T row) throws IOException {
+    if (enableEqualityDelete()) {
+      FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos());
+      insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> {
+        if (v == null) {
+          return Lists.newArrayList(filePos);
+        } else {
+          v.add(filePos);
+          return v;
+        }
+      });
+    }
+
+    dataWriter.write(row);
+  }
+
+  @Override
+  public void writeEqualityDelete(T equalityDelete) throws IOException {
+    if (!enableEqualityDelete()) {
+      throw new UnsupportedOperationException("Could not accept equality deletion.");
+    }
+
+    List<FilePos> existing = insertedRowMap.get(structLikeFun.apply(equalityDelete));
+
+    if (existing == null) {
+      // Delete the row which have been written by other completed delta writer.
+      equalityDeleteWriter.write(equalityDelete);
+    } else {
+      // Delete the rows which was written in current delta writer.
+      for (FilePos filePos : existing) {
+        posDeleteWriter.write(positionDelete.set(filePos.path(), filePos.pos(), null));

Review comment:
       Great point ! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangminglei commented on pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
zhangminglei commented on pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#issuecomment-724130212


   Thanks to @openinx , Our company has been looking forward to this CDC capability for a long time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#issuecomment-723900972


   Hi @rdblue ,  I've finished the basic abstraction for DeltaWriter (it's a draft, I proposed many TODO issues),  which would write insert records, equality records and position records (with row or without row).  How is your feeling about this abstraction work ?  
   
   (I'm still working with the integration things so that I could import the mysql binlog events from the source [flink-cdc-connector](https://github.com/ververica/flink-cdc-connectors) to apache iceberg table and read them correctly ).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r526673921



##########
File path: core/src/main/java/org/apache/iceberg/DataFileWriter.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class DataFileWriter<T> implements ContentFileWriter<DataFile, T> {

Review comment:
       I don't think so, because `FileAppender` is the basic writer which would be shared by all of the data file writer, equality-delete writer, position-delete writer, even the manifest writer is based on it.  So apparently putting the `toDataFile` to `FileAppender`  is not a good choice. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523334457



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingContentFileWriter.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentFileWriter;
+import org.apache.iceberg.ContentFileWriterFactory;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.util.Tasks;
+
+public class RollingContentFileWriter<ContentFileT, T> implements Closeable {
+
+  private static final int ROWS_DIVISOR = 1000;
+  private final PartitionKey partitionKey;
+  private final FileFormat format;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+  private final ContentFileWriterFactory<ContentFileT, T> writerFactory;
+
+  private final WriterResult.Builder resultBuilder;
+
+  private EncryptedOutputFile currentFile = null;
+  private ContentFileWriter<ContentFileT, T> currentFileWriter = null;
+  private long currentRows = 0;
+
+  public RollingContentFileWriter(PartitionKey partitionKey, FileFormat format,
+                                  OutputFileFactory fileFactory, FileIO io,
+                                  long targetFileSize, ContentFileWriterFactory<ContentFileT, T> writerFactory) {
+    this.partitionKey = partitionKey;
+    this.format = format;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+    this.writerFactory = writerFactory;
+
+    this.resultBuilder = WriterResult.builder();
+
+    openCurrent();
+  }
+
+  public CharSequence currentPath() {
+    return currentFile.encryptingOutputFile().location();
+  }
+
+  public long currentPos() {
+    return currentRows;
+  }
+
+  public void write(T record) throws IOException {
+    this.currentFileWriter.write(record);
+    this.currentRows++;
+
+    if (shouldRollToNewFile()) {
+      closeCurrent();
+      openCurrent();
+    }
+  }
+
+  public void abort() throws IOException {
+    close();
+
+    WriterResult result = resultBuilder.build();
+
+    Tasks.foreach(result.contentFiles())
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  public WriterResult complete() throws IOException {
+    close();
+
+    return resultBuilder.build();
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("partitionKey", partitionKey)
+        .add("format", format)
+        .toString();
+  }
+
+  private void openCurrent() {
+    if (partitionKey == null) {
+      // unpartitioned
+      currentFile = fileFactory.newOutputFile();
+    } else {
+      // partitioned
+      currentFile = fileFactory.newOutputFile(partitionKey);
+    }
+    currentFileWriter = writerFactory.createWriter(partitionKey, currentFile, format);
+    currentRows = 0;
+  }
+
+  private boolean shouldRollToNewFile() {
+    // TODO: ORC file now not support target file size before closed
+    return !format.equals(FileFormat.ORC) &&
+        currentRows % ROWS_DIVISOR == 0 && currentFileWriter.length() >= targetFileSize;
+  }
+
+  private void closeCurrent() throws IOException {
+    if (currentFileWriter != null) {
+      currentFileWriter.close();
+
+      ContentFileT contentFile = currentFileWriter.toContentFile();
+      Metrics metrics = currentFileWriter.metrics();

Review comment:
       Also, this class knows when a file has been rolled and keeps its own count of rows, so there is no need to get the number of rows passed to the current file writer. This should use `currentRows` instead. That way we can remove `metrics`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523337575



##########
File path: core/src/main/java/org/apache/iceberg/DataFileWriter.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class DataFileWriter<T> implements ContentFileWriter<DataFile, T> {

Review comment:
       Much of this duplicates `FileAppender`. Should we use that interface instead and add `toDataFile` to it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r527022154



##########
File path: core/src/main/java/org/apache/iceberg/io/DeltaWriterFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.ContentFileWriterFactory;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+/**
+ * Factory to create {@link DeltaWriter}, which have few dependencies factories to create different kinds of writers.
+ */
+public interface DeltaWriterFactory<T> {
+
+  /**
+   * Create a factory to initialize the {@link DeltaWriter}.
+   */
+  DeltaWriter<T> createDeltaWriter(PartitionKey partitionKey, Context context);
+
+  /**
+   * Create a factory to initialize the {@link FileAppender}.
+   */
+  FileAppenderFactory<T> createFileAppenderFactory();

Review comment:
       Well,  we should only have one `FileAppenderFactory` for one `TaskWriter`,  each partition `DeltaWriter` could share the `FileAppenderFactory`.  It's better to move it out of this factory.
   
   The following `createDataFileWriterFactory`,  `createEqualityDeleteWriterFactory`, `createEqualityDeleteWriterFactory` have the similar issues. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r528278694



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);
+    }
+
+    if (posDeleteWriter != null && equalityDeleteWriter == null) {
+      // Only accept INSERT records and position deletion.
+      Preconditions.checkArgument(tableSchema == null);
+      Preconditions.checkArgument(equalityFieldIds == null);
+    }
+
+    if (equalityDeleteWriter != null) {
+      // Accept insert records, position deletion, equality deletions.
+      Preconditions.checkNotNull(posDeleteWriter,
+          "Position delete writer shouldn't be null when writing equality deletions.");
+      Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null");
+      Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null");
+      Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null");
+
+      Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds));
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.structLikeFun = structLikeFun;
+    } else {
+      this.insertedRowMap = null;
+      this.structLikeFun = null;
+    }
+
+    this.dataWriter = dataWriter;
+    this.equalityDeleteWriter = equalityDeleteWriter;
+    this.posDeleteWriter = posDeleteWriter;
+  }
+
+  @Override
+  public void writeRow(T row) throws IOException {
+    if (enableEqualityDelete()) {
+      FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos());
+      insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> {
+        if (v == null) {
+          return Lists.newArrayList(filePos);
+        } else {
+          v.add(filePos);
+          return v;

Review comment:
       Think about it again, My conclusion is :  we should choose the solutions: 
   
   1. When a INSERT come in, then: 
      a.     Eliminating duplicated INSERT (adding a delete to replace the record as you said) in the same checkpoint for a given TaskWriter; 
      b.     Write it into data file, and just ignore the duplication INSERTs between checkpoints; 
   
   2. when a DELETE come in,  then: 
        a.   write the pos-delete file if key exists in the `insertedRowMap` ; 
        b.   always write the equality-delete. 
   
   
   Firstly, eliminating duplicated INSERT is easy, because we've already had the `insertedRowMap`, if there's a coming duplicated insert key, then we just replace the old INSERT (by writing the pos-delete file) with new INSERT (by writing the insert data file).  The `insertedRowMap` don't have to track `List<FilePos>` per key,  only need the `FilePos` per key. 
   
   Second,  eliminating the duplicated INSERT between checkpoints is high-cost and we don't have to. because the equality-deletions will mask all duplicated INSERT for the same key in the previous checkpoint. 
   
   
   The code should be like that: 
   
   ```java
   @Override
     public void writeRow(T row) {
       if (allowEqDelete()) {
         RowOffset rowOffset = RowOffset.create(dataWriter.currentPath(), dataWriter.currentRows());
   
         // Copy the key to avoid messing up the insertedRowMap.
         StructLike copiedKey = asCopiedKey(row);
   
         // Adding a pos-delete to replace the old row.
         RowOffset previous = insertedRowMap.put(copiedKey, rowOffset);
         if (previous != null) {
           posDeleteWriter.write(positionDelete.set(previous.path, previous.rowId, row));
         }
       }
   
       dataWriter.write(row);
     }
   
     @Override
     public void writeEqualityDelete(T equalityDelete) {
       Preconditions.checkState(allowEqDelete(), "Could not accept equality deletion.");
   
       StructLike key = asKey(equalityDelete);
       RowOffset previous = insertedRowMap.get(key);
   
       if (previous != null) {
         posDeleteWriter.write(positionDelete.set(previous.path, previous.rowId, equalityDelete));
         insertedRowMap.remove(key);
       }
   
       eqDeleteWriter.write(equalityDelete);
     }
   ```
   
   
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r527012748



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingPosDeleteWriter.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.Set;
+import org.apache.iceberg.ContentFileWriter;
+import org.apache.iceberg.ContentFileWriterFactory;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public class RollingPosDeleteWriter<T> extends RollingContentFileWriter<DeleteFile, PositionDelete<T>> {
+  private final Set<CharSequence> referencedDataFiles;
+
+  public RollingPosDeleteWriter(PartitionKey partitionKey, FileFormat format,
+                                OutputFileFactory fileFactory, FileIO io, long targetFileSize,
+                                ContentFileWriterFactory<DeleteFile, PositionDelete<T>> writerFactory) {
+    super(partitionKey, format, fileFactory, io, targetFileSize, writerFactory);
+
+    this.referencedDataFiles = Sets.newHashSet();
+  }
+
+  @Override
+  protected void beforeClose(ContentFileWriter<DeleteFile, PositionDelete<T>> writer) {
+    PositionDeleteWriter<T> positionDeleteWriter = (PositionDeleteWriter<T>) writer;
+    referencedDataFiles.addAll(positionDeleteWriter.referencedDataFiles());
+  }
+
+  public Set<CharSequence> referencedDataFiles() {
+    return referencedDataFiles;

Review comment:
       This `referencedDataFiles` will be used to do the validation for `RowDelta#validateDataFilesExist`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r526708985



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeltaWriter<T> implements DeltaWriter<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class);
+
+  private final RollingContentFileWriter<DataFile, T> dataWriter;
+  private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter;
+  private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter;
+
+  private final PositionDelete<T> positionDelete = new PositionDelete<>();
+  private final StructLikeMap<List<FilePos>> insertedRowMap;
+
+  // Function to convert the generic data to a StructLike.
+  private final Function<T, StructLike> structLikeFun;
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) {
+    this(dataWriter, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) {
+    this(dataWriter, posDeleteWriter, null, null, null, null);
+  }
+
+  public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter,
+                         RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter,
+                         RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter,
+                         Schema tableSchema,
+                         List<Integer> equalityFieldIds,
+                         Function<T, StructLike> structLikeFun) {
+
+    Preconditions.checkNotNull(dataWriter, "Data writer should always not be null.");
+
+    if (posDeleteWriter == null) {
+      // Only accept INSERT records.
+      Preconditions.checkArgument(equalityDeleteWriter == null);
+    }
+
+    if (posDeleteWriter != null && equalityDeleteWriter == null) {
+      // Only accept INSERT records and position deletion.
+      Preconditions.checkArgument(tableSchema == null);
+      Preconditions.checkArgument(equalityFieldIds == null);
+    }
+
+    if (equalityDeleteWriter != null) {
+      // Accept insert records, position deletion, equality deletions.
+      Preconditions.checkNotNull(posDeleteWriter,
+          "Position delete writer shouldn't be null when writing equality deletions.");
+      Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null");
+      Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null");
+      Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null");
+
+      Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds));
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.structLikeFun = structLikeFun;
+    } else {
+      this.insertedRowMap = null;
+      this.structLikeFun = null;
+    }
+
+    this.dataWriter = dataWriter;
+    this.equalityDeleteWriter = equalityDeleteWriter;
+    this.posDeleteWriter = posDeleteWriter;
+  }
+
+  @Override
+  public void writeRow(T row) throws IOException {
+    if (enableEqualityDelete()) {
+      FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos());
+      insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> {
+        if (v == null) {
+          return Lists.newArrayList(filePos);
+        } else {
+          v.add(filePos);
+          return v;

Review comment:
       Let's consider the three cases: 
   
   Case.1 :   Import CDC data whose table has an unique key to iceberg table.   In this case,  it's impossible to have duplicated records with the same key to INSERT.
   
   Case.2 :  Import CDC data whose table does not have an unique key to iceberg table,  the equality fields should be all columns in the table,  but it be possible that we will encounter duplicated rows because database such as mysql would allow to write the same record twice.  But that's not a common case, more people will create a table with unique keys, I think it's OK to not address this case. 
   
   Case.3:   upsert records in the iceberg table.  it must have primary keys. The `UPSERT` will be executed as an `DELETE` and an `INSERT`,  then we could also ensure that duplicated keys won't happen in the `writeRow` method ( which will be called when `INSERT` row).
   
   So in general,  I think it's OK to expect that we don't have multiple inserts for the same key. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangminglei edited a comment on pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
zhangminglei edited a comment on pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#issuecomment-724130212


   Thanks to @openinx , Our company has been looking forward to this CDC capability for a long time. I can't wait to experiment it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r526665980



##########
File path: core/src/main/java/org/apache/iceberg/ContentFileWriter.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+public interface ContentFileWriter<T, R> extends Closeable {

Review comment:
       > Another way is to define a BaseRollingFileWriter and put the common logic there, then the DeltaWriter would use the BaseRollingFileWriter. when constructing the DeltaWriter, we would need to pass those subclasses PosDeleteRollingFileWriter, EqDeleteRollingFileWriter, DataRollingFileWriter to it.
   
   In this way, we will need to created the PosDeleteRollingFileWriter, EqDeleteRollingFileWriter, DataRollingFileWriter for different engines,  for example FlinkPosDeleteRollingWriter, SparkPosDeleteRollingWriter because different engines need to contruct different FileAppenders to convert the specified data type into the unified parquet/orc/avro  files.  I think that won't be less complexity compared to current solution. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r523302084



##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -300,23 +299,12 @@ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
 
       meta("delete-type", "position");
 
-      if (rowSchema != null && createWriterFunc != null) {
-        // the appender uses the row schema wrapped with position fields
-        appenderBuilder.schema(new org.apache.iceberg.Schema(
-            MetadataColumns.DELETE_FILE_PATH,
-            MetadataColumns.DELETE_FILE_POS,
-            NestedField.optional(
-                MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
-                MetadataColumns.DELETE_FILE_ROW_DOC)));
+      appenderBuilder.schema(DeletesUtil.posDeleteSchema(rowSchema));

Review comment:
       This changes the logic so that it is incorrect. If a row will not be written, then the schema should not include that row. The row may not be written if either the `rowSchema` is not set, or if the `createWriterFunc` is not set. But there are cases where the `rowSchema` is set and the other is not because of the way callers use this API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#issuecomment-726652886


   @rdblue  Could you pls take a look this prototype if you have time, I want to make sure that our understanding about this design is roughly the same (After that I would love to divide this big patch into several small patches for reviewing).  Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#discussion_r526846694



##########
File path: core/src/main/java/org/apache/iceberg/io/RollingContentFileWriter.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentFileWriter;
+import org.apache.iceberg.ContentFileWriterFactory;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.util.Tasks;
+
+public class RollingContentFileWriter<ContentFileT, T> implements Closeable {
+
+  private static final int ROWS_DIVISOR = 1000;
+  private final PartitionKey partitionKey;
+  private final FileFormat format;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+  private final ContentFileWriterFactory<ContentFileT, T> writerFactory;
+
+  private final WriterResult.Builder resultBuilder;
+
+  private EncryptedOutputFile currentFile = null;
+  private ContentFileWriter<ContentFileT, T> currentFileWriter = null;
+  private long currentRows = 0;
+
+  public RollingContentFileWriter(PartitionKey partitionKey, FileFormat format,
+                                  OutputFileFactory fileFactory, FileIO io,
+                                  long targetFileSize, ContentFileWriterFactory<ContentFileT, T> writerFactory) {
+    this.partitionKey = partitionKey;
+    this.format = format;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+    this.writerFactory = writerFactory;
+
+    this.resultBuilder = WriterResult.builder();
+
+    openCurrent();
+  }
+
+  public CharSequence currentPath() {
+    return currentFile.encryptingOutputFile().location();
+  }
+
+  public long currentPos() {
+    return currentRows;
+  }
+
+  public void write(T record) throws IOException {
+    this.currentFileWriter.write(record);
+    this.currentRows++;
+
+    if (shouldRollToNewFile()) {
+      closeCurrent();
+      openCurrent();
+    }
+  }
+
+  public void abort() throws IOException {
+    close();
+
+    WriterResult result = resultBuilder.build();
+
+    Tasks.foreach(result.contentFiles())
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  public WriterResult complete() throws IOException {
+    close();
+
+    return resultBuilder.build();
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("partitionKey", partitionKey)
+        .add("format", format)
+        .toString();
+  }
+
+  private void openCurrent() {
+    if (partitionKey == null) {
+      // unpartitioned
+      currentFile = fileFactory.newOutputFile();
+    } else {
+      // partitioned
+      currentFile = fileFactory.newOutputFile(partitionKey);
+    }
+    currentFileWriter = writerFactory.createWriter(partitionKey, currentFile, format);
+    currentRows = 0;
+  }
+
+  private boolean shouldRollToNewFile() {
+    // TODO: ORC file now not support target file size before closed
+    return !format.equals(FileFormat.ORC) &&
+        currentRows % ROWS_DIVISOR == 0 && currentFileWriter.length() >= targetFileSize;
+  }
+
+  private void closeCurrent() throws IOException {
+    if (currentFileWriter != null) {
+      currentFileWriter.close();
+
+      ContentFileT contentFile = currentFileWriter.toContentFile();
+      Metrics metrics = currentFileWriter.metrics();

Review comment:
       Yeah, the `currentRows` is enough, don't have to expose the `metrics` now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1663: Flink: write the CDC records into apache iceberg tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1663:
URL: https://github.com/apache/iceberg/pull/1663#issuecomment-733383097


   @openinx, should we close this one in favor of #1818?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org