You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/18 18:08:11 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #5300: Core: Add base implementations for changelog tasks

aokolnychyi opened a new pull request, #5300:
URL: https://github.com/apache/iceberg/pull/5300

   This PR adds implementations for changelog tasks to be used in the changelog scan.
   This PR currently relies on existing tests. More tests will come with the scan implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929354370


##########
core/src/main/java/org/apache/iceberg/BaseContentScanTask.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+abstract class BaseContentScanTask<ThisT, F extends ContentFile<F>>

Review Comment:
   We are using `ThisT` throughout the code to have access to the actual type of the object in a parent class. Usually, we don't annotate it with boundaries. However, I had to in this case. 
   
   It became this:
   
   ```
   ThisT extends ContentScanTask<F>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r923921920


##########
core/src/main/java/org/apache/iceberg/BaseAddedRowsScanTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+class BaseAddedRowsScanTask
+    extends BaseChangelogContentScanTask<AddedRowsScanTask, DataFile>
+    implements AddedRowsScanTask {
+
+  private final DeleteFile[] deletes;
+
+  BaseAddedRowsScanTask(int changeOrdinal, long commitSnapshotId, DataFile file, DeleteFile[] deletes,
+                        String schemaString, String specString, ResidualEvaluator residuals) {
+    super(changeOrdinal, commitSnapshotId, file, schemaString, specString, residuals);
+    this.deletes = deletes != null ? deletes : new DeleteFile[0];
+  }
+
+  @Override
+  protected AddedRowsScanTask self() {
+    return this;
+  }
+
+  @Override
+  public List<DeleteFile> deletes() {
+    return ImmutableList.copyOf(deletes);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingOffsets(List<Long> offsets) {
+    return () -> new OffsetsAwareSplitScanTaskIteratorImpl(this, offsets);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingFixedSize(long targetSplitSize) {
+    return () -> new FixedSizeSplitScanTaskIteratorImpl(this, targetSplitSize);
+  }
+
+  private static class SplitScanTaskImpl
+      extends SplitScanTask<SplitScanTaskImpl, AddedRowsScanTask, DataFile>
+      implements AddedRowsScanTask {
+
+    SplitScanTaskImpl(AddedRowsScanTask parentTask, long offset, long length) {
+      super(parentTask, offset, length);
+    }
+
+    @Override
+    public List<DeleteFile> deletes() {
+      return parentTask().deletes();
+    }
+
+    @Override
+    public SplitScanTaskImpl merge(ScanTask other) {
+      SplitScanTaskImpl that = (SplitScanTaskImpl) other;
+      return new SplitScanTaskImpl(parentTask(), start(), length() + that.length());

Review Comment:
   Is it possible in  parent class to have abstract construction method:      
   
   protected abstract ThisT newSplitScanTask(T parentTask, int start, int offset);
   
   and then push merge() to parent, using this construction method?  Then we get merge() and canMerge() in the same class. 



##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())
+        .add("partition_data", file().partition())
+        .add("residual", residual())
+        .toString();
+  }
+
+  abstract static class SplitScanTask<ThisT, T extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>

Review Comment:
   ThisT and T are a bit confusing, how about something like T => ParentT ?



##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())

Review Comment:
   There seems to be some ways to have a method toStringHelper() at each level and use it for inheritance, do you think its worth it?  https://github.com/google/guava/issues/1239



##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())
+        .add("partition_data", file().partition())
+        .add("residual", residual())
+        .toString();
+  }
+
+  abstract static class SplitScanTask<ThisT, T extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>

Review Comment:
   Nit: I realize it gets long, but can we make these "SplitScanTask" classes have unique names?  There seems now 4 new SplitScanTask classes, which makes navigation very hard (finding a specific class on IDE).  I also got initially confused that it's extending SplitScanTask from BaseFileScanTask, so that's actually a fifth unrelated one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r930257835


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())

Review Comment:
   I tried it locally. I couldn't control the order of fields in child classes and I couldn't benefit from it in split scan tasks as that one does not extend `BaseContentScanTask`. I'd just probably keep it as-is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929160975


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;
+
+  OffsetsAwareSplitScanTaskIterator(T parentTask, long parentTaskLength, List<Long> offsetList) {

Review Comment:
   I am a bit worried to add/change the existing behavior in this class. I know we had a bug related to point 2 when the size of the file in metadata was incorrect. I guess handling point 1 may be a good idea. Let me think a bit more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#issuecomment-1188008308

   @flyrain @rdblue @stevenzwu @karuppayya @szehon-ho @RussellSpitzer @anuragmantri 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929345032


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;

Review Comment:
   I changed the code in `next` a little bit so hopefully the purpose of `sizeIndex` is a bit clearer now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r930337321


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;

Review Comment:
   Switched.



##########
core/src/main/java/org/apache/iceberg/SplitScanTaskIterator.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+
+/**
+ * An iterator that splits tasks.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+interface SplitScanTaskIterator<T extends ScanTask> extends Iterator<T> {
+
+  @FunctionalInterface
+  interface CreateSplitTaskFunction<T extends ScanTask> {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929139734


##########
core/src/main/java/org/apache/iceberg/BaseAddedRowsScanTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+class BaseAddedRowsScanTask
+    extends BaseChangelogContentScanTask<AddedRowsScanTask, DataFile>
+    implements AddedRowsScanTask {
+
+  private final DeleteFile[] deletes;
+
+  BaseAddedRowsScanTask(int changeOrdinal, long commitSnapshotId, DataFile file, DeleteFile[] deletes,
+                        String schemaString, String specString, ResidualEvaluator residuals) {
+    super(changeOrdinal, commitSnapshotId, file, schemaString, specString, residuals);
+    this.deletes = deletes != null ? deletes : new DeleteFile[0];
+  }
+
+  @Override
+  protected AddedRowsScanTask self() {
+    return this;
+  }
+
+  @Override
+  public List<DeleteFile> deletes() {
+    return ImmutableList.copyOf(deletes);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingOffsets(List<Long> offsets) {
+    return () -> new OffsetsAwareSplitScanTaskIteratorImpl(this, offsets);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingFixedSize(long targetSplitSize) {
+    return () -> new FixedSizeSplitScanTaskIteratorImpl(this, targetSplitSize);
+  }
+
+  private static class SplitScanTaskImpl
+      extends SplitScanTask<SplitScanTaskImpl, AddedRowsScanTask, DataFile>
+      implements AddedRowsScanTask {
+
+    SplitScanTaskImpl(AddedRowsScanTask parentTask, long offset, long length) {
+      super(parentTask, offset, length);
+    }
+
+    @Override
+    public List<DeleteFile> deletes() {
+      return parentTask().deletes();
+    }
+
+    @Override
+    public SplitScanTaskImpl merge(ScanTask other) {
+      SplitScanTaskImpl that = (SplitScanTaskImpl) other;
+      return new SplitScanTaskImpl(parentTask(), start(), length() + that.length());

Review Comment:
   @szehon-ho, I am not sure that will be any cleaner as the only purpose of `merge` would be to directly call that new abstract method. I feel like it would pretty much the same.
   
   @stevenzwu, that's the existing logic from `FileScanTask`. If two tasks are adjacent after split planning, we merge them together into one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929386089


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;

Review Comment:
   Yeah, I see what you mean now. I don't know what the original interpretation was but I can switch to `splitIndex`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged PR #5300:
URL: https://github.com/apache/iceberg/pull/5300


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929344140


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())
+        .add("partition_data", file().partition())
+        .add("residual", residual())
+        .toString();
+  }
+
+  abstract static class SplitScanTask<ThisT, T extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>

Review Comment:
   I managed to implement it. Could you take a look, @szehon-ho?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929415102


##########
core/src/main/java/org/apache/iceberg/SplitScanTaskIterator.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+
+/**
+ * An iterator that splits tasks.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+interface SplitScanTaskIterator<T extends ScanTask> extends Iterator<T> {
+
+  @FunctionalInterface
+  interface CreateSplitTaskFunction<T extends ScanTask> {

Review Comment:
   Minor naming suggestion:
   ```
    @FunctionalInterface
     interface SplitTaskCreator<T extends ScanTask> {
     T create(T parentTask, long offset, long length);
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r930461872


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929340712


##########
core/src/main/java/org/apache/iceberg/BaseContentScanTask.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+abstract class BaseContentScanTask<ThisT, F extends ContentFile<F>>

Review Comment:
   Nit: Does it make sense to use `T extends ScanTask` instead of `ThisT`? So that the type of `ThisT` is more obvious here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929448922


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())

Review Comment:
   Hm , maybe I might have been not clear, so just to clarify it wasn't suggesting to make toString in the child clases and logging other fields like location, but rather to reduce the code by having a method toStringHelper() from the base class (BaseContentScanTask) and re-use it (where we already have log file, partition_data, and residual) ?



##########
core/src/main/java/org/apache/iceberg/BaseAddedRowsScanTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+class BaseAddedRowsScanTask
+    extends BaseChangelogContentScanTask<AddedRowsScanTask, DataFile>
+    implements AddedRowsScanTask {
+
+  private final DeleteFile[] deletes;
+
+  BaseAddedRowsScanTask(int changeOrdinal, long commitSnapshotId, DataFile file, DeleteFile[] deletes,
+                        String schemaString, String specString, ResidualEvaluator residuals) {
+    super(changeOrdinal, commitSnapshotId, file, schemaString, specString, residuals);
+    this.deletes = deletes != null ? deletes : new DeleteFile[0];
+  }
+
+  @Override
+  protected AddedRowsScanTask self() {
+    return this;
+  }
+
+  @Override
+  public List<DeleteFile> deletes() {
+    return ImmutableList.copyOf(deletes);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingOffsets(List<Long> offsets) {
+    return () -> new OffsetsAwareSplitScanTaskIteratorImpl(this, offsets);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingFixedSize(long targetSplitSize) {
+    return () -> new FixedSizeSplitScanTaskIteratorImpl(this, targetSplitSize);
+  }
+
+  private static class SplitScanTaskImpl
+      extends SplitScanTask<SplitScanTaskImpl, AddedRowsScanTask, DataFile>
+      implements AddedRowsScanTask {
+
+    SplitScanTaskImpl(AddedRowsScanTask parentTask, long offset, long length) {
+      super(parentTask, offset, length);
+    }
+
+    @Override
+    public List<DeleteFile> deletes() {
+      return parentTask().deletes();
+    }
+
+    @Override
+    public SplitScanTaskImpl merge(ScanTask other) {
+      SplitScanTaskImpl that = (SplitScanTaskImpl) other;
+      return new SplitScanTaskImpl(parentTask(), start(), length() + that.length());

Review Comment:
   Yea its not reducing a lot of code, just more symmetric for the reader.  The classes below (OffsetsAwareSplitScanTaskIteratorImpl and FixedSizeSplitScanTaskIteratorImpl) already have this 'newSplitTask' method, so we would just have one more just like that.
   
   ```
     private static class SplitScanTaskImpl
         extends SplitScanTask<SplitScanTaskImpl, AddedRowsScanTask, DataFile>
         implements AddedRowsScanTask {
   
       SplitScanTaskImpl(AddedRowsScanTask parentTask, long offset, long length) {
         super(parentTask, offset, length);
       }
   
       @Override
       public List<DeleteFile> deletes() {
         return parentTask().deletes();
       }
   
       @Override
       protected SplitScanTaskImpl newSplitTask(AddedRowsScanTask parentTask, long offset, long length) {
         return new SplitScanTaskImpl(parentTask, offset, length);
       }
     }
   ```
   
   So the merge() is not just directly calling the newSplitTask but actually doing somewhat corresponding logic to canMerge(), which now is more clear in the same class.
   
   ```
     abstract static class SplitScanTask<ThisT, T extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>
         implements ContentScanTask<F>, ChangelogScanTask, MergeableScanTask<ThisT> {
      ...
          abstract ThisT newSplitTask(T parentTask, long offset, long length);
       
       @Override
       public boolean canMerge(ScanTask other) {
         if (getClass().equals(other.getClass())) {
           SplitScanTask<?, ?, ?> that = (SplitScanTask<?, ?, ?>) other;
           return changeOrdinal() == that.changeOrdinal() &&
               commitSnapshotId() == that.commitSnapshotId() &&
               file().equals(that.file()) &&
               start() + length() == that.start();
   
         } else {
           return false;
         }
       }
   
       @Override
       public ThisT merge(ScanTask other) {
         SplitScanTask that = (SplitScanTask) other;
         return newSplitTask(parentTask(), start(), length() + that.length());
       }
      ...
   }
   ```
   
   If we can somehow get rid of all these redundant classes with a lambda or something that would be the best, but probably its not possible.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929386089


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;

Review Comment:
   Yeah, I see what you mean now. I don't know what was the original interpretation but I can switch to `splitIndex`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929238804


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())
+        .add("partition_data", file().partition())
+        .add("residual", residual())
+        .toString();
+  }
+
+  abstract static class SplitScanTask<ThisT, T extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>

Review Comment:
   That sounds great



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r930457668


##########
core/src/main/java/org/apache/iceberg/BaseAddedRowsScanTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+class BaseAddedRowsScanTask
+    extends BaseChangelogContentScanTask<AddedRowsScanTask, DataFile>
+    implements AddedRowsScanTask {
+
+  private final DeleteFile[] deletes;
+
+  BaseAddedRowsScanTask(int changeOrdinal, long commitSnapshotId, DataFile file, DeleteFile[] deletes,
+                        String schemaString, String specString, ResidualEvaluator residuals) {
+    super(changeOrdinal, commitSnapshotId, file, schemaString, specString, residuals);
+    this.deletes = deletes != null ? deletes : new DeleteFile[0];
+  }
+
+  @Override
+  protected AddedRowsScanTask self() {
+    return this;
+  }
+
+  @Override
+  public List<DeleteFile> deletes() {
+    return ImmutableList.copyOf(deletes);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingOffsets(List<Long> offsets) {
+    return () -> new OffsetsAwareSplitScanTaskIteratorImpl(this, offsets);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingFixedSize(long targetSplitSize) {
+    return () -> new FixedSizeSplitScanTaskIteratorImpl(this, targetSplitSize);
+  }
+
+  private static class SplitScanTaskImpl
+      extends SplitScanTask<SplitScanTaskImpl, AddedRowsScanTask, DataFile>
+      implements AddedRowsScanTask {
+
+    SplitScanTaskImpl(AddedRowsScanTask parentTask, long offset, long length) {
+      super(parentTask, offset, length);
+    }
+
+    @Override
+    public List<DeleteFile> deletes() {
+      return parentTask().deletes();
+    }
+
+    @Override
+    public SplitScanTaskImpl merge(ScanTask other) {
+      SplitScanTaskImpl that = (SplitScanTaskImpl) other;
+      return new SplitScanTaskImpl(parentTask(), start(), length() + that.length());

Review Comment:
   Yea, I didn't realize that you would remove the newSplitTask from the bottom two methods, leaving this one to be the only one.  But it's better to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929344293


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())
+        .add("partition_data", file().partition())
+        .add("residual", residual())
+        .toString();
+  }
+
+  abstract static class SplitScanTask<ThisT, T extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929381652


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;

Review Comment:
   >  it is an index of the size of this iterator that is used to access both offsets and split sizes lists
   
   That is exactly where I found it a little confusing to be named as `sizeIndex`. This index is used to construct a split from the indexed offsets and sizes lists.
   
   ```
       long offset = offsets.get(sizeIndex);
       long splitSize = splitSizes.get(sizeIndex);
       sizeIndex += 1; // create 1 split per offset
       return createSplitTaskFunc.apply(parentTask, offset, splitSize);
   ```
   
   But this is a very minor thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929157459


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;

Review Comment:
   I copied this code as it was. From what I see, it looks like an index to the size of this iterator that is used to access both offsets and split sizes. Let me do some renames to make it a bit cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r927206586


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>

Review Comment:
   nit: `ThisT` seems a little uncommon compared to just `T`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r927207565


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>

Review Comment:
   does `ThisT` refers to this class of `BaseChangelogContentScanTask`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r930256627


##########
core/src/main/java/org/apache/iceberg/BaseAddedRowsScanTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+class BaseAddedRowsScanTask
+    extends BaseChangelogContentScanTask<AddedRowsScanTask, DataFile>
+    implements AddedRowsScanTask {
+
+  private final DeleteFile[] deletes;
+
+  BaseAddedRowsScanTask(int changeOrdinal, long commitSnapshotId, DataFile file, DeleteFile[] deletes,
+                        String schemaString, String specString, ResidualEvaluator residuals) {
+    super(changeOrdinal, commitSnapshotId, file, schemaString, specString, residuals);
+    this.deletes = deletes != null ? deletes : new DeleteFile[0];
+  }
+
+  @Override
+  protected AddedRowsScanTask self() {
+    return this;
+  }
+
+  @Override
+  public List<DeleteFile> deletes() {
+    return ImmutableList.copyOf(deletes);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingOffsets(List<Long> offsets) {
+    return () -> new OffsetsAwareSplitScanTaskIteratorImpl(this, offsets);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingFixedSize(long targetSplitSize) {
+    return () -> new FixedSizeSplitScanTaskIteratorImpl(this, targetSplitSize);
+  }
+
+  private static class SplitScanTaskImpl
+      extends SplitScanTask<SplitScanTaskImpl, AddedRowsScanTask, DataFile>
+      implements AddedRowsScanTask {
+
+    SplitScanTaskImpl(AddedRowsScanTask parentTask, long offset, long length) {
+      super(parentTask, offset, length);
+    }
+
+    @Override
+    public List<DeleteFile> deletes() {
+      return parentTask().deletes();
+    }
+
+    @Override
+    public SplitScanTaskImpl merge(ScanTask other) {
+      SplitScanTaskImpl that = (SplitScanTaskImpl) other;
+      return new SplitScanTaskImpl(parentTask(), start(), length() + that.length());

Review Comment:
   Okay, I've changed the approach to have `merge` in the parent. Let me know if that's what you meant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r930457668


##########
core/src/main/java/org/apache/iceberg/BaseAddedRowsScanTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+class BaseAddedRowsScanTask
+    extends BaseChangelogContentScanTask<AddedRowsScanTask, DataFile>
+    implements AddedRowsScanTask {
+
+  private final DeleteFile[] deletes;
+
+  BaseAddedRowsScanTask(int changeOrdinal, long commitSnapshotId, DataFile file, DeleteFile[] deletes,
+                        String schemaString, String specString, ResidualEvaluator residuals) {
+    super(changeOrdinal, commitSnapshotId, file, schemaString, specString, residuals);
+    this.deletes = deletes != null ? deletes : new DeleteFile[0];
+  }
+
+  @Override
+  protected AddedRowsScanTask self() {
+    return this;
+  }
+
+  @Override
+  public List<DeleteFile> deletes() {
+    return ImmutableList.copyOf(deletes);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingOffsets(List<Long> offsets) {
+    return () -> new OffsetsAwareSplitScanTaskIteratorImpl(this, offsets);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingFixedSize(long targetSplitSize) {
+    return () -> new FixedSizeSplitScanTaskIteratorImpl(this, targetSplitSize);
+  }
+
+  private static class SplitScanTaskImpl
+      extends SplitScanTask<SplitScanTaskImpl, AddedRowsScanTask, DataFile>
+      implements AddedRowsScanTask {
+
+    SplitScanTaskImpl(AddedRowsScanTask parentTask, long offset, long length) {
+      super(parentTask, offset, length);
+    }
+
+    @Override
+    public List<DeleteFile> deletes() {
+      return parentTask().deletes();
+    }
+
+    @Override
+    public SplitScanTaskImpl merge(ScanTask other) {
+      SplitScanTaskImpl that = (SplitScanTaskImpl) other;
+      return new SplitScanTaskImpl(parentTask(), start(), length() + that.length());

Review Comment:
   Yea, I didn't realize that you would remove the newSplitTask from the bottom two methods, leaving this one to be the only one.  But it's better to me, I like the method name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r927211216


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;
+
+  OffsetsAwareSplitScanTaskIterator(T parentTask, long parentTaskLength, List<Long> offsetList) {

Review Comment:
   do we need to validate?
   
   1. offsetList is in strictly ascending order
   2. last offset is smaller than the parentTaskLength



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r927211417


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;

Review Comment:
   nit: is `splitIndex` more accurate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r923921920


##########
core/src/main/java/org/apache/iceberg/BaseAddedRowsScanTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+class BaseAddedRowsScanTask
+    extends BaseChangelogContentScanTask<AddedRowsScanTask, DataFile>
+    implements AddedRowsScanTask {
+
+  private final DeleteFile[] deletes;
+
+  BaseAddedRowsScanTask(int changeOrdinal, long commitSnapshotId, DataFile file, DeleteFile[] deletes,
+                        String schemaString, String specString, ResidualEvaluator residuals) {
+    super(changeOrdinal, commitSnapshotId, file, schemaString, specString, residuals);
+    this.deletes = deletes != null ? deletes : new DeleteFile[0];
+  }
+
+  @Override
+  protected AddedRowsScanTask self() {
+    return this;
+  }
+
+  @Override
+  public List<DeleteFile> deletes() {
+    return ImmutableList.copyOf(deletes);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingOffsets(List<Long> offsets) {
+    return () -> new OffsetsAwareSplitScanTaskIteratorImpl(this, offsets);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingFixedSize(long targetSplitSize) {
+    return () -> new FixedSizeSplitScanTaskIteratorImpl(this, targetSplitSize);
+  }
+
+  private static class SplitScanTaskImpl
+      extends SplitScanTask<SplitScanTaskImpl, AddedRowsScanTask, DataFile>
+      implements AddedRowsScanTask {
+
+    SplitScanTaskImpl(AddedRowsScanTask parentTask, long offset, long length) {
+      super(parentTask, offset, length);
+    }
+
+    @Override
+    public List<DeleteFile> deletes() {
+      return parentTask().deletes();
+    }
+
+    @Override
+    public SplitScanTaskImpl merge(ScanTask other) {
+      SplitScanTaskImpl that = (SplitScanTaskImpl) other;
+      return new SplitScanTaskImpl(parentTask(), start(), length() + that.length());

Review Comment:
   Is it possible in  parent class to have abstract construction method:      
   
   protected abstract ThisT newSplitScanTask(T parentTask, int start, int offset);
   
   and then push merge() to parent, using this construction method?  Then we get merge() and canMerge() in the same parent class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929163723


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())
+        .add("partition_data", file().partition())
+        .add("residual", residual())
+        .toString();
+  }
+
+  abstract static class SplitScanTask<ThisT, T extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>

Review Comment:
   Let me think about unique yet not extremely long names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929344496


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;
+
+  OffsetsAwareSplitScanTaskIterator(T parentTask, long parentTaskLength, List<Long> offsetList) {

Review Comment:
   I added validation for sorted offsets, which is required by the spec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929144043


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())

Review Comment:
   The string representation would include the actual class name. I thought that would be sufficient. I don't think we include the file path or location of deletes files right now. The rest is the same across changelog tasks.
   
   Locations may have some sensitive data. I'd probably keep it as-is for now to match the existing behavior. We can definitely reconsider the behavior in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929148221


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())
+        .add("partition_data", file().partition())
+        .add("residual", residual())
+        .toString();
+  }
+
+  abstract static class SplitScanTask<ThisT, T extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>

Review Comment:
   I agree. I was trying to stay on one line as the list of params is long but let me try to find some reasonable formatting to use `ParentT`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#issuecomment-1196960023

   Thanks for reviewing, @flyrain @stevenzwu @szehon-ho!
   
   I merged this to unblock the scan implementation. If anyone has any additional comments, I'll be happy to follow up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929157459


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareSplitScanTaskIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An iterator that splits tasks using split offsets such as row group offsets in Parquet.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+abstract class OffsetsAwareSplitScanTaskIterator<T extends ScanTask> implements Iterator<T> {
+  private final T parentTask;
+  private final List<Long> offsets;
+  private final List<Long> splitSizes;
+  private int sizeIdx = 0;

Review Comment:
   I copied this code as it was. From what I see, it is an index of the size of this iterator that is used to access both offsets and split sizes. Let me do some renames to make it a bit cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r929237705


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())
+        .add("partition_data", file().partition())
+        .add("residual", residual())
+        .toString();
+  }
+
+  abstract static class SplitScanTask<ThisT, T extends ContentScanTask<F> & ChangelogScanTask, F extends ContentFile<F>>

Review Comment:
   @szehon-ho, instead of making the base class abstract and extending it everywhere, I can probably accept a closure to construct a split scan task in the base class. Then we won't need any subclasses. I think that's going to be cleaner. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r930224242


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>
+    extends BaseContentScanTask<ThisT, F>
+    implements ChangelogScanTask {
+
+  private final int changeOrdinal;
+  private final long commitSnapshotId;
+
+  BaseChangelogContentScanTask(int changeOrdinal, long commitSnapshotId, F file,
+                               String schemaString, String specString, ResidualEvaluator residuals) {
+    super(file, schemaString, specString, residuals);
+    this.changeOrdinal = changeOrdinal;
+    this.commitSnapshotId = commitSnapshotId;
+  }
+
+  @Override
+  public int changeOrdinal() {
+    return changeOrdinal;
+  }
+
+  @Override
+  public long commitSnapshotId() {
+    return commitSnapshotId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("change_ordinal", changeOrdinal)
+        .add("commit_snapshot_id", commitSnapshotId)
+        .add("file", file().path())

Review Comment:
   Oh, okay, got it now. Let me explore this idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r927262854


##########
core/src/main/java/org/apache/iceberg/BaseAddedRowsScanTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+class BaseAddedRowsScanTask
+    extends BaseChangelogContentScanTask<AddedRowsScanTask, DataFile>
+    implements AddedRowsScanTask {
+
+  private final DeleteFile[] deletes;
+
+  BaseAddedRowsScanTask(int changeOrdinal, long commitSnapshotId, DataFile file, DeleteFile[] deletes,
+                        String schemaString, String specString, ResidualEvaluator residuals) {
+    super(changeOrdinal, commitSnapshotId, file, schemaString, specString, residuals);
+    this.deletes = deletes != null ? deletes : new DeleteFile[0];
+  }
+
+  @Override
+  protected AddedRowsScanTask self() {
+    return this;
+  }
+
+  @Override
+  public List<DeleteFile> deletes() {
+    return ImmutableList.copyOf(deletes);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingOffsets(List<Long> offsets) {
+    return () -> new OffsetsAwareSplitScanTaskIteratorImpl(this, offsets);
+  }
+
+  @Override
+  protected Iterable<AddedRowsScanTask> splitUsingFixedSize(long targetSplitSize) {
+    return () -> new FixedSizeSplitScanTaskIteratorImpl(this, targetSplitSize);
+  }
+
+  private static class SplitScanTaskImpl
+      extends SplitScanTask<SplitScanTaskImpl, AddedRowsScanTask, DataFile>
+      implements AddedRowsScanTask {
+
+    SplitScanTaskImpl(AddedRowsScanTask parentTask, long offset, long length) {
+      super(parentTask, offset, length);
+    }
+
+    @Override
+    public List<DeleteFile> deletes() {
+      return parentTask().deletes();
+    }
+
+    @Override
+    public SplitScanTaskImpl merge(ScanTask other) {
+      SplitScanTaskImpl that = (SplitScanTaskImpl) other;
+      return new SplitScanTaskImpl(parentTask(), start(), length() + that.length());

Review Comment:
   I didn't quite follow the merge semantics here. we just add the length but lose track of `other` in the returned object. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r927206586


##########
core/src/main/java/org/apache/iceberg/BaseChangelogContentScanTask.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+abstract class BaseChangelogContentScanTask<ThisT, F extends ContentFile<F>>

Review Comment:
   nit: `ThisT` seems a little uncommon compared to just `T`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5300: Core: Add base implementations for changelog tasks

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5300:
URL: https://github.com/apache/iceberg/pull/5300#discussion_r930329324


##########
core/src/main/java/org/apache/iceberg/SplitScanTaskIterator.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Iterator;
+
+/**
+ * An iterator that splits tasks.
+ *
+ * @param <T> the Java type of tasks produced by this iterator
+ */
+interface SplitScanTaskIterator<T extends ScanTask> extends Iterator<T> {
+
+  @FunctionalInterface
+  interface CreateSplitTaskFunction<T extends ScanTask> {

Review Comment:
   Missed it. That's a good idea. Let me change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org