You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/18 20:34:38 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #5301: Core: Support building custom tasks in ManifestGroup

aokolnychyi opened a new pull request, #5301:
URL: https://github.com/apache/iceberg/pull/5301

   This PR extends `ManifestGroup` with support of creating custom scan tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r923837639


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {

Review Comment:
   I am not super happy with having a builder as it adds more complexity. However, I did it this way so that we can have a loading cache of task factories per spec in `ManifestGroup`. Right now, we parse schema and spec JSON representations for each manifest, which is not required. As those JSON objects can get pretty large, I feel doing the parsing once per spec is an important optimization.
   
   If we want to get rid of the builder, then I'll have to implement per spec caching in each task factory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r929299684


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -156,6 +156,14 @@ ManifestGroup planWith(ExecutorService newExecutorService) {
    * @return a {@link CloseableIterable} of {@link FileScanTask}
    */
   public CloseableIterable<FileScanTask> planFiles() {
+    return plan((entries, ctx) -> CloseableIterable.transform(entries, entry -> {
+      DataFile dataFile = entry.file().copy(ctx.shouldKeepStats());
+      DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);
+      return new BaseFileScanTask(dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), ctx.residuals());
+    }));

Review Comment:
   Nit: I kind of prefer a normal function instead of an anonymous one for better readability since we are using the functionInterface here, which needs two steps to figure out the types of input parameters.  Or we just use `BiFunction`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r923837639


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {

Review Comment:
   I am not super happy with having a builder as it adds more complexity. However, I did it this way so that we can have a loading cache of task factories per spec in `ManifestGroup`. Right now, we parse schema and spec JSON representations for each manifest, which is not required. As those JSON objects can get pretty large, I feel doing the parsing once per spec is a reasonable optimization. The planning time is critical.
   
   If we want to get rid of the builder, then I'll have to implement per spec caching in each task factory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#issuecomment-1194928665

   +1 for this approach. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r923863182


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {
+      private boolean dropStats = false;
+
+      boolean dropStats() {

Review Comment:
   We need access this field in child builders so that they can pass it to factories. I made it package-private cause it was the least open modifier that would work but I can make it protected if there is a good reason for that. The outer class is package-private too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r929357206


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {

Review Comment:
   No longer applies.



##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {
+      private boolean dropStats = false;
+
+      boolean dropStats() {

Review Comment:
   No longer applies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r929385100


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -156,6 +156,14 @@ ManifestGroup planWith(ExecutorService newExecutorService) {
    * @return a {@link CloseableIterable} of {@link FileScanTask}
    */
   public CloseableIterable<FileScanTask> planFiles() {
+    return plan((entries, ctx) -> CloseableIterable.transform(entries, entry -> {
+      DataFile dataFile = entry.file().copy(ctx.shouldKeepStats());
+      DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);
+      return new BaseFileScanTask(dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), ctx.residuals());
+    }));

Review Comment:
   +1 for the new method `createFileScanTasks`. Thanks @aokolnychyi.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r928991682


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {

Review Comment:
   +1 for yufei approach. Or we construct the cache for `schemaString` per spec and pass the `schemaString` from cache to `createTask(CloseableIterable<ManifestEntry<DataFile>> entries, String schemaString)`. But I prefer the former.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r924849697


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {

Review Comment:
   Can we just have an inheritable method like `ScanTaskFactory<T extends ScanTask> newInstance(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean newDropStats)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#issuecomment-1195752820

   Thanks for reviewing, @RussellSpitzer @flyrain @Reo-LEI!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r929131614


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +289,45 @@ public void close() throws IOException {
           }
         });
   }
+
+  @FunctionalInterface
+  interface CreateTasksFunction<T extends ScanTask> {

Review Comment:
   I could use `BiFunction` here but the definition would be bulky so I decided to add a functional interface just to shorten lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#issuecomment-1194390754

   @flyrain @Reo-LEI @RussellSpitzer, I've changed the approach to use a functional interface. I feel it is easier than before. Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r923840791


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {
+      private boolean dropStats = false;
+
+      boolean dropStats() {
+        return dropStats;
+      }
+
+      Builder<T> dropStats(boolean newDropStats) {
+        this.dropStats = newDropStats;
+        return this;
+      }
+
+      abstract ScanTaskFactory<T> build(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals);
+    }
+  }
+
+  private static class FileScanTaskFactory extends ScanTaskFactory<FileScanTask> {

Review Comment:
   I'll have more such factories for changelog tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r923841747


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);

Review Comment:
   A task factory may create more than one task per entry. That's why I accept an iterable here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r929474523


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +289,45 @@ public void close() throws IOException {
           }
         });
   }
+
+  @FunctionalInterface
+  interface CreateTasksFunction<T extends ScanTask> {

Review Comment:
   I think it's more readable to use functional interface. +1 for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r923837639


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {

Review Comment:
   I am not super happy with having a builder as it adds more complexity. However, I did it this way so that we can have a loading cache of task factories per spec in `ManifestGroup`. Right now, we parse schema and spec JSON representations for each manifest, which is not required. As those JSON objects can get pretty large, I feel doing the parsing once per spec is a reasonable optimization.
   
   If we want to get rid of the builder, then I'll have to implement per spec caching in each task factory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r923857857


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {
+      private boolean dropStats = false;
+
+      boolean dropStats() {

Review Comment:
   I see this is used by the subclass, so maybe it can just be protected here? Or is that an issue with static classes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r929356959


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -156,6 +156,14 @@ ManifestGroup planWith(ExecutorService newExecutorService) {
    * @return a {@link CloseableIterable} of {@link FileScanTask}
    */
   public CloseableIterable<FileScanTask> planFiles() {
+    return plan((entries, ctx) -> CloseableIterable.transform(entries, entry -> {
+      DataFile dataFile = entry.file().copy(ctx.shouldKeepStats());
+      DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);
+      return new BaseFileScanTask(dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), ctx.residuals());
+    }));

Review Comment:
   @flyrain, can you elaborate a bit? I am not sure I understood.
   Do you mean creating an explicit `CreateFileScanTaskFunction` and using it here?
   
   I did not use `BiFunction` to avoid a bulky definition.
   
   ```
   public <T extends ScanTask> CloseableIterable<T> plan(BiFunction<CloseableIterable<ManifestEntry<DataFile>>, TaskContext, CloseableIterable<T>> createTasksFunc) {
   
   ...
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r923855326


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {
+      private boolean dropStats = false;
+
+      boolean dropStats() {

Review Comment:
   Do we need to be able to return the dropStats value here? Seems like we can avoid this and not have two dropStats methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#issuecomment-1188287870

   @flyrain @rdblue @stevenzwu @karuppayya @szehon-ho @RussellSpitzer @anuragmantri


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged PR #5301:
URL: https://github.com/apache/iceberg/pull/5301


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5301: Core: Support building custom tasks in ManifestGroup

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5301:
URL: https://github.com/apache/iceberg/pull/5301#discussion_r924851645


##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -279,4 +287,84 @@ public void close() throws IOException {
           }
         });
   }
+
+  abstract static class ScanTaskFactory<T extends ScanTask> {
+    private final String schemaAsString;
+    private final String specAsString;
+    private final DeleteFileIndex deletes;
+    private final ResidualEvaluator residuals;
+    private final boolean dropStats;
+
+    ScanTaskFactory(PartitionSpec spec, DeleteFileIndex deletes, ResidualEvaluator residuals, boolean dropStats) {
+      this.schemaAsString = SchemaParser.toJson(spec.schema());
+      this.specAsString = PartitionSpecParser.toJson(spec);
+      this.deletes = deletes;
+      this.residuals = residuals;
+      this.dropStats = dropStats;
+    }
+
+    abstract CloseableIterable<T> createTasks(CloseableIterable<ManifestEntry<DataFile>> entries);
+
+    String schemaAsString() {
+      return schemaAsString;
+    }
+
+    String specAsString() {
+      return specAsString;
+    }
+
+    DeleteFileIndex deletes() {
+      return deletes;
+    }
+
+    ResidualEvaluator residuals() {
+      return residuals;
+    }
+
+    boolean shouldKeepStats() {
+      return !dropStats;
+    }
+
+    abstract static class Builder<T extends ScanTask> {

Review Comment:
   It is basically a mini builder with less code involved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org