You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/27 20:48:35 UTC

[GitHub] [iceberg] rdblue opened a new pull request #1397: Actions: Speed up expire snapshots action

rdblue opened a new pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397


   This replaces use of the `all_data_files` metadata table in RemoveOrphanFilesAction and ExpireSnapshotsAction with a call to read data file paths from manifest files in parallel. This avoids reading all of the manifest lists in the Spark driver to plan the `all_data_files` scan.
   
   On large tables, this runs much faster with adaptive execution and broadcast joins disabled. Both optimizations use size estimates that are incorrect because the number of data files is much larger than the number of manifests in a table, and Spark does not account for a single row (manifest file) producing thousands or millions of result rows (data files) in a stage.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478713149



##########
File path: spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -94,9 +102,30 @@ protected String metadataTableName(String tableName, MetadataTableType type) {
     return buildValidDataFileDF(spark, table().toString());
   }
 
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
+    private final Broadcast<FileIO> io;
+
+    ReadManifest(Broadcast<FileIO> io) {
+      this.io = io;
+    }
+
+    @Override
+    public Iterator<String> call(ManifestFileBean manifest) {
+      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    }
+  }
+
   protected Dataset<Row> buildValidDataFileDF(SparkSession spark, String tableName) {
-    String allDataFilesMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_DATA_FILES);
-    return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path");
+    JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
+    Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table()));

Review comment:
       nit: We use `Broadcast<FileIO> io` in all other places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478706641



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+
+public class ManifestFileBean implements ManifestFile {
+  private String path = null;
+  private Long length = null;
+  private Integer partitionSpecId = null;
+  private Long addedSnapshotId = null;
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public Long getLength() {
+    return length;
+  }
+
+  public void setLength(Long length) {
+    this.length = length;
+  }
+
+  public Integer getPartitionSpecId() {
+    return partitionSpecId;
+  }
+
+  public void setPartitionSpecId(Integer partitionSpecId) {
+    this.partitionSpecId = partitionSpecId;
+  }
+
+  public Long getAddedSnapshotId() {
+    return addedSnapshotId;
+  }
+
+  public void setAddedSnapshotId(Long addedSnapshotId) {
+    this.addedSnapshotId = addedSnapshotId;
+  }
+
+  @Override
+  public String path() {
+    return path;
+  }
+
+  @Override
+  public long length() {
+    return length;
+  }
+
+  @Override
+  public int partitionSpecId() {
+    return partitionSpecId;
+  }
+
+  @Override
+  public ManifestContent content() {
+    return ManifestContent.DATA;
+  }
+
+  @Override
+  public long sequenceNumber() {
+    return 0;

Review comment:
       This is actually used by the reader to construct inherited metadata. I think we should just make this class package-private since it is only intended to be used by the action.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478690746



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+
+public class ManifestFileBean implements ManifestFile {

Review comment:
       The Datafile Equivalent of this is named SparkDataFile, although it isn't written as a Bean as uses a "wrap(Row row)" method fo applying. I think this naming more sense to me, just wanted to note.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478710846



##########
File path: spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -94,9 +102,30 @@ protected String metadataTableName(String tableName, MetadataTableType type) {
     return buildValidDataFileDF(spark, table().toString());
   }
 
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
+    private final Broadcast<FileIO> io;
+
+    ReadManifest(Broadcast<FileIO> io) {
+      this.io = io;
+    }
+
+    @Override
+    public Iterator<String> call(ManifestFileBean manifest) {
+      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    }
+  }
+
   protected Dataset<Row> buildValidDataFileDF(SparkSession spark, String tableName) {
-    String allDataFilesMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_DATA_FILES);
-    return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path");
+    JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
+    Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table()));
+    String allManifestsMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_MANIFESTS);
+
+    Dataset<ManifestFileBean> allManifests = spark.read().format("iceberg").load(allManifestsMetadataTable)
+        .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId")
+        .dropDuplicates("path")
+        .as(Encoders.bean(ManifestFileBean.class));
+
+    return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path");

Review comment:
       Let's add a manual repartition step to avoid surprises with adaptive execution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478723524



##########
File path: spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -94,9 +102,30 @@ protected String metadataTableName(String tableName, MetadataTableType type) {
     return buildValidDataFileDF(spark, table().toString());
   }
 
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
+    private final Broadcast<FileIO> io;
+
+    ReadManifest(Broadcast<FileIO> io) {
+      this.io = io;
+    }
+
+    @Override
+    public Iterator<String> call(ManifestFileBean manifest) {
+      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    }
+  }
+
   protected Dataset<Row> buildValidDataFileDF(SparkSession spark, String tableName) {
-    String allDataFilesMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_DATA_FILES);
-    return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path");
+    JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
+    Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table()));
+    String allManifestsMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_MANIFESTS);
+
+    Dataset<ManifestFileBean> allManifests = spark.read().format("iceberg").load(allManifestsMetadataTable)
+        .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId")
+        .dropDuplicates("path")
+        .as(Encoders.bean(ManifestFileBean.class));

Review comment:
       Fixed by adding a repartition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478723354



##########
File path: core/src/main/java/org/apache/iceberg/ManifestFiles.java
##########
@@ -23,15 +23,30 @@
 import java.util.Map;
 import org.apache.iceberg.ManifestReader.FileType;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
 public class ManifestFiles {
   private ManifestFiles() {
   }
 
+  /**
+   * Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}.
+   *
+   * @param manifest a ManifestFile
+   * @param io a FileIO
+   * @return a manifest reader
+   */
+  public static CloseableIterable<String> readPaths(ManifestFile manifest, FileIO io) {

Review comment:
       No, because the live entries are the expected case in the API. That's why readers are Iterable<DataFile>. Entries and deleted files are hidden through the API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#issuecomment-682236583


   Thanks for the quick reviews!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478744692



##########
File path: spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -94,9 +102,30 @@ protected String metadataTableName(String tableName, MetadataTableType type) {
     return buildValidDataFileDF(spark, table().toString());
   }
 
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
+    private final Broadcast<FileIO> io;
+
+    ReadManifest(Broadcast<FileIO> io) {
+      this.io = io;
+    }
+
+    @Override
+    public Iterator<String> call(ManifestFileBean manifest) {
+      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    }
+  }
+
   protected Dataset<Row> buildValidDataFileDF(SparkSession spark, String tableName) {
-    String allDataFilesMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_DATA_FILES);
-    return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path");
+    JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
+    Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table()));

Review comment:
       Sorry I missed fixing this. We can update it next time we touch this code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478712647



##########
File path: spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -94,9 +102,30 @@ protected String metadataTableName(String tableName, MetadataTableType type) {
     return buildValidDataFileDF(spark, table().toString());
   }
 
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {

Review comment:
       In other actions, we have private static methods at the end of the classes like below:
   
   ```
   private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
         Broadcast<SerializableConfiguration> conf,
         long olderThanTimestamp) {
   
       return (FlatMapFunction<Iterator<String>, String>) dirs -> {
         // logic
       };
     }
   ```
   
   I am OK with this approach too but may make sense to align it with other places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478723354



##########
File path: core/src/main/java/org/apache/iceberg/ManifestFiles.java
##########
@@ -23,15 +23,30 @@
 import java.util.Map;
 import org.apache.iceberg.ManifestReader.FileType;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
 public class ManifestFiles {
   private ManifestFiles() {
   }
 
+  /**
+   * Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}.
+   *
+   * @param manifest a ManifestFile
+   * @param io a FileIO
+   * @return a manifest reader
+   */
+  public static CloseableIterable<String> readPaths(ManifestFile manifest, FileIO io) {

Review comment:
       No, because the live entries are the expected case in the API. That's why readers are `Iterable<DataFile>`. Entries and deleted files are hidden through the API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478691824



##########
File path: spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -57,8 +65,8 @@ protected String metadataTableName(String tableName, MetadataTableType type) {
   }
 
   /**
-   * Returns all the path locations of all Manifest Lists for a given table
-   * @param table the table
+   * Returns all the path locations of all Manifest Lists for a given list of snapshots
+   * @param snapshots snapshots

Review comment:
       I forgot about this when I refactored before! Good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478722869



##########
File path: spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -94,9 +102,30 @@ protected String metadataTableName(String tableName, MetadataTableType type) {
     return buildValidDataFileDF(spark, table().toString());
   }
 
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {

Review comment:
       Moved.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478703612



##########
File path: spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -94,9 +102,30 @@ protected String metadataTableName(String tableName, MetadataTableType type) {
     return buildValidDataFileDF(spark, table().toString());
   }
 
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
+    private final Broadcast<FileIO> io;
+
+    ReadManifest(Broadcast<FileIO> io) {
+      this.io = io;
+    }
+
+    @Override
+    public Iterator<String> call(ManifestFileBean manifest) {
+      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    }
+  }
+
   protected Dataset<Row> buildValidDataFileDF(SparkSession spark, String tableName) {
-    String allDataFilesMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_DATA_FILES);
-    return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path");
+    JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
+    Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table()));
+    String allManifestsMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_MANIFESTS);
+
+    Dataset<ManifestFileBean> allManifests = spark.read().format("iceberg").load(allManifestsMetadataTable)
+        .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId")
+        .dropDuplicates("path")
+        .as(Encoders.bean(ManifestFileBean.class));

Review comment:
       As we discussed a bit, it may make sense to log a warning here for users with "adaptive query on" that they will lose the ability to control the parallelism of the manifest read stage with that parameter enabled. I'm a little torn on whether that is too technical a detail or whether it will trip up lots of real users ...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478707068



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+
+public class ManifestFileBean implements ManifestFile {
+  private String path = null;
+  private Long length = null;
+  private Integer partitionSpecId = null;
+  private Long addedSnapshotId = null;
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public Long getLength() {
+    return length;
+  }
+
+  public void setLength(Long length) {
+    this.length = length;
+  }
+
+  public Integer getPartitionSpecId() {
+    return partitionSpecId;
+  }
+
+  public void setPartitionSpecId(Integer partitionSpecId) {
+    this.partitionSpecId = partitionSpecId;
+  }
+
+  public Long getAddedSnapshotId() {
+    return addedSnapshotId;
+  }
+
+  public void setAddedSnapshotId(Long addedSnapshotId) {
+    this.addedSnapshotId = addedSnapshotId;
+  }
+
+  @Override
+  public String path() {
+    return path;
+  }
+
+  @Override
+  public long length() {
+    return length;
+  }
+
+  @Override
+  public int partitionSpecId() {
+    return partitionSpecId;
+  }
+
+  @Override
+  public ManifestContent content() {
+    return ManifestContent.DATA;
+  }
+
+  @Override
+  public long sequenceNumber() {
+    return 0;

Review comment:
       Sorry, I just remembered that it can't be or else Spark can't use it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478705048



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+
+public class ManifestFileBean implements ManifestFile {
+  private String path = null;
+  private Long length = null;
+  private Integer partitionSpecId = null;
+  private Long addedSnapshotId = null;
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public Long getLength() {
+    return length;
+  }
+
+  public void setLength(Long length) {
+    this.length = length;
+  }
+
+  public Integer getPartitionSpecId() {
+    return partitionSpecId;
+  }
+
+  public void setPartitionSpecId(Integer partitionSpecId) {
+    this.partitionSpecId = partitionSpecId;
+  }
+
+  public Long getAddedSnapshotId() {
+    return addedSnapshotId;
+  }
+
+  public void setAddedSnapshotId(Long addedSnapshotId) {
+    this.addedSnapshotId = addedSnapshotId;
+  }
+
+  @Override
+  public String path() {
+    return path;
+  }
+
+  @Override
+  public long length() {
+    return length;
+  }
+
+  @Override
+  public int partitionSpecId() {
+    return partitionSpecId;
+  }
+
+  @Override
+  public ManifestContent content() {
+    return ManifestContent.DATA;
+  }
+
+  @Override
+  public long sequenceNumber() {
+    return 0;

Review comment:
       For these and the other ones like it, should we throw unsupported? Since these values are not actually populated?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478720013



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;

Review comment:
       `SparkDataFile` is in `org.apache.iceberg.spark`. Do we want to have these next to each other?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1397: Actions: Speed up expire snapshots action

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1397:
URL: https://github.com/apache/iceberg/pull/1397#discussion_r478709284



##########
File path: core/src/main/java/org/apache/iceberg/ManifestFiles.java
##########
@@ -23,15 +23,30 @@
 import java.util.Map;
 import org.apache.iceberg.ManifestReader.FileType;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
 public class ManifestFiles {
   private ManifestFiles() {
   }
 
+  /**
+   * Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}.
+   *
+   * @param manifest a ManifestFile
+   * @param io a FileIO
+   * @return a manifest reader
+   */
+  public static CloseableIterable<String> readPaths(ManifestFile manifest, FileIO io) {

Review comment:
       Do we want to document or reflect in the name that we return paths for liveEntries only?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org