You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "abmo-x (via GitHub)" <gi...@apache.org> on 2023/04/17 16:57:47 UTC

[GitHub] [iceberg] abmo-x opened a new pull request, #7363: Add files list partitions optimized

abmo-x opened a new pull request, #7363:
URL: https://github.com/apache/iceberg/pull/7363

   AddFilesProcedure uses InMemoryFileIndex to get SparkPartitions. InMemoryFileIndex is not used efficiently as it lists _all_ the files in a given path to discover partitions which is memory intensive and not really needed as we just want to find the partitions. This can be achieved without listing all the files
   
   This PR replaces InMemoryFileIndex  with a custom implementation of _PartitioningAwareFileIndex_ which avoids listing _all_ files and only lists the directories to discover partitions. 
   
   This optimization reduces _listPartitions_ _latency_ by > 90% and _memory_ usage more than 3 times. 
   
   Addresses Issue: https://github.com/apache/iceberg/issues/7027
   
   Tested with input S3 folder with large number of files.
   
   All Files in path: 
   Filter on date and hour, where single partition has 41,417 files
   
   | Cluster Config  | Before |  After     |  Improvement     |
   | ------------- | ------------- | ------------- | ------------- |
   | Driver Memory | 128GB (Fails with OOM if < 128GB)  | 8GB | ~93% |
   | Executor Memory  | 64GB  | 8GB | ~87% |
   | Num of Executors  | 20 |4  | ~80% |
   | List Partitions Latency  | 12M49.459S  | 0.774S  | ~99% |
   
   Latency to add the files in all partitions once they are listed is the same, before 2H25M14.905S and after 2H26M15.803S
   
   2H25M14.905S | 2H26M15.803S
   -- | --
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169158840


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {
+
+    private final Path rootPath;
+    private final FileStatusCache fileStatusCache;
+    private final SparkSession sparkSession;
+    private final StructType userSpecifiedSchema;
+    private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+    private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles;
+    private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec;
+
+    public InMemoryLeafDirOnlyIndex(
+        SparkSession sparkSession,
+        scala.collection.immutable.Map<String, String> parameters,
+        StructType userSpecifiedSchema,
+        FileStatusCache fileStatusCache,
+        Path rootPath) {
+      super(sparkSession, parameters, Option.apply(userSpecifiedSchema), fileStatusCache);
+      this.fileStatusCache = fileStatusCache;
+      this.rootPath = rootPath;
+      this.sparkSession = sparkSession;
+      this.userSpecifiedSchema = userSpecifiedSchema;
+    }
+
+    @Override
+    public scala.collection.Seq<Path> rootPaths() {
+      return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq();
+    }
+
+    @Override
+    public void refresh() {
+      fileStatusCache.invalidateAll();
+      cachedLeafFiles = null;
+      cachedLeafDirToChildFiles = null;
+      cachedPartitionSpec = null;
+    }
+
+    @Override
+    public org.apache.spark.sql.execution.datasources.PartitionSpec partitionSpec() {
+      if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning();
+      }
+      log.trace("Partition spec: {}", cachedPartitionSpec);
+      return cachedPartitionSpec;
+    }
+
+    @Override
+    public LinkedHashMap<Path, FileStatus> leafFiles() {
+      if (cachedLeafFiles == null) {
+        try {
+          List<FileStatus> fileStatuses =
+              listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0);
+          LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>();
+          for (FileStatus fs : fileStatuses) {
+            map.put(fs.getPath(), fs);
+          }
+          cachedLeafFiles = map;
+        } catch (IOException e) {
+          throw new RuntimeException("error listing files for path=" + rootPath, e);
+        }
+      }
+      return cachedLeafFiles;
+    }
+
+    static List<FileStatus> listLeafDirs(
+        SparkSession spark, Path path, StructType partitionSpec, int level) throws IOException {
+      List<FileStatus> leafDirs = new ArrayList<>();
+      int numPartitionCols = partitionSpec.fields().length;
+      if (level < numPartitionCols) {
+        try (FileSystem fs = path.getFileSystem(spark.sparkContext().hadoopConfiguration())) {
+          List<FileStatus> dirs =
+              Stream.of(fs.listStatus(path))
+                  .filter(FileStatus::isDirectory)
+                  .collect(Collectors.toList());
+          for (FileStatus dir : dirs) {
+            // stop recursive call once we reach the expected end of partitions as per table schema
+            if (level == numPartitionCols - 1) {
+              leafDirs.add(dir);
+            } else {
+              leafDirs.addAll(listLeafDirs(spark, dir.getPath(), partitionSpec, level + 1));
+            }
+          }
+        }
+      }
+      return leafDirs;
+    }
+
+    @Override
+    public scala.collection.immutable.Map<Path, FileStatus[]> leafDirToChildrenFiles() {
+      if (cachedLeafDirToChildFiles == null) {
+        List<Tuple2<Path, FileStatus[]>> tuple2s =
+            JavaConverters.seqAsJavaList(leafFiles().values().toSeq()).stream()
+                .map(
+                    fileStatus -> {
+                      // Create an empty data file in the leaf dir.
+                      // As this index is only used to list partition directories,
+                      // we can stop listing the leaf dir to avoid unnecessary listing which can
+                      // take a while on folders with 1000s of files
+                      return new Tuple2<>(
+                          fileStatus.getPath(),
+                          new FileStatus[] {createEmptyChildDataFileStatus(fileStatus)});
+                    })
+                .collect(Collectors.toList());
+        cachedLeafDirToChildFiles =
+            (scala.collection.immutable.Map<Path, FileStatus[]>)
+                Map$.MODULE$.apply(JavaConverters.asScalaBuffer(tuple2s));
+      }
+      return cachedLeafDirToChildFiles;
+    }
+
+    private FileStatus createEmptyChildDataFileStatus(FileStatus fileStatus) {
+      return new FileStatus(
+          1L,
+          false,
+          fileStatus.getReplication(),
+          1L,
+          fileStatus.getModificationTime(),
+          fileStatus.getAccessTime(),
+          fileStatus.getPermission(),
+          fileStatus.getOwner(),
+          fileStatus.getGroup(),
+          new Path(fileStatus.getPath(), fileStatus.getPath().toString() + "/dummyDataFile"));
+    }

Review Comment:
   yeah, IntelliJ does that automatically now so is not obvious anymore. 
   
   will add it so its useful for other IDEs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#issuecomment-1513851249

   Had to create a Scala class as this PR is extending a base Scala class in Spark. When I created it as Java class compilation fails with Scala 2.13


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1171709257


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols
+   * from user specified schema.
+   * @param path
+   * @param partitionIndex
+   * @return
+   */
+  private def listLeafDirs(path: Path, partitionIndex: Int): mutable.LinkedHashSet[FileStatus] = {
+    val startTime = System.nanoTime()
+    val output = mutable.LinkedHashSet[FileStatus]()
+    val numPartitions = userSpecifiedSchema.fields.length
+    if (partitionIndex < numPartitions) {
+      path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+        .listStatus(path)
+        .filter(f => f.isDirectory)
+        .foreach(f => {
+          if (partitionIndex == numPartitions -1) {
+            output.add(f)
+          } else {
+            output ++= listLeafDirs(f.getPath, partitionIndex + 1)
+          }
+        })
+    }
+    logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf dirs" +

Review Comment:
   I don't think so as this will always be in ms/nanos. will remove it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1171708861


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -170,7 +173,8 @@ private void importFileTable(
 
     if (table.spec().isUnpartitioned()) {
       Preconditions.checkArgument(
-          partitions.isEmpty(), "Cannot add partitioned files to an unpartitioned table");
+          !Spark3Util.isPartitioned(spark(), tableLocation),

Review Comment:
   As we now rely on table's partitionSpec to get the partition paths, if a table is unpartitioned `partitions.isEmpty` will always be true. Hence we need this check for unpartitioned tables to do a quick check if the path contains any dir. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169159587


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {
+
+    private final Path rootPath;
+    private final FileStatusCache fileStatusCache;
+    private final SparkSession sparkSession;
+    private final StructType userSpecifiedSchema;
+    private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+    private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles;
+    private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec;
+
+    public InMemoryLeafDirOnlyIndex(
+        SparkSession sparkSession,
+        scala.collection.immutable.Map<String, String> parameters,
+        StructType userSpecifiedSchema,
+        FileStatusCache fileStatusCache,
+        Path rootPath) {
+      super(sparkSession, parameters, Option.apply(userSpecifiedSchema), fileStatusCache);
+      this.fileStatusCache = fileStatusCache;
+      this.rootPath = rootPath;
+      this.sparkSession = sparkSession;
+      this.userSpecifiedSchema = userSpecifiedSchema;
+    }
+
+    @Override
+    public scala.collection.Seq<Path> rootPaths() {
+      return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq();
+    }
+
+    @Override
+    public void refresh() {
+      fileStatusCache.invalidateAll();
+      cachedLeafFiles = null;
+      cachedLeafDirToChildFiles = null;
+      cachedPartitionSpec = null;
+    }
+
+    @Override
+    public org.apache.spark.sql.execution.datasources.PartitionSpec partitionSpec() {
+      if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning();
+      }
+      log.trace("Partition spec: {}", cachedPartitionSpec);
+      return cachedPartitionSpec;
+    }
+
+    @Override
+    public LinkedHashMap<Path, FileStatus> leafFiles() {
+      if (cachedLeafFiles == null) {
+        try {
+          List<FileStatus> fileStatuses =
+              listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0);
+          LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>();
+          for (FileStatus fs : fileStatuses) {
+            map.put(fs.getPath(), fs);
+          }
+          cachedLeafFiles = map;
+        } catch (IOException e) {
+          throw new RuntimeException("error listing files for path=" + rootPath, e);
+        }
+      }
+      return cachedLeafFiles;
+    }
+
+    static List<FileStatus> listLeafDirs(
+        SparkSession spark, Path path, StructType partitionSpec, int level) throws IOException {
+      List<FileStatus> leafDirs = new ArrayList<>();
+      int numPartitionCols = partitionSpec.fields().length;
+      if (level < numPartitionCols) {
+        try (FileSystem fs = path.getFileSystem(spark.sparkContext().hadoopConfiguration())) {
+          List<FileStatus> dirs =
+              Stream.of(fs.listStatus(path))
+                  .filter(FileStatus::isDirectory)
+                  .collect(Collectors.toList());
+          for (FileStatus dir : dirs) {
+            // stop recursive call once we reach the expected end of partitions as per table schema
+            if (level == numPartitionCols - 1) {
+              leafDirs.add(dir);
+            } else {
+              leafDirs.addAll(listLeafDirs(spark, dir.getPath(), partitionSpec, level + 1));
+            }
+          }
+        }
+      }
+      return leafDirs;
+    }
+
+    @Override
+    public scala.collection.immutable.Map<Path, FileStatus[]> leafDirToChildrenFiles() {
+      if (cachedLeafDirToChildFiles == null) {
+        List<Tuple2<Path, FileStatus[]>> tuple2s =
+            JavaConverters.seqAsJavaList(leafFiles().values().toSeq()).stream()
+                .map(
+                    fileStatus -> {
+                      // Create an empty data file in the leaf dir.
+                      // As this index is only used to list partition directories,
+                      // we can stop listing the leaf dir to avoid unnecessary listing which can
+                      // take a while on folders with 1000s of files
+                      return new Tuple2<>(
+                          fileStatus.getPath(),
+                          new FileStatus[] {createEmptyChildDataFileStatus(fileStatus)});
+                    })
+                .collect(Collectors.toList());
+        cachedLeafDirToChildFiles =
+            (scala.collection.immutable.Map<Path, FileStatus[]>)
+                Map$.MODULE$.apply(JavaConverters.asScalaBuffer(tuple2s));
+      }
+      return cachedLeafDirToChildFiles;
+    }
+
+    private FileStatus createEmptyChildDataFileStatus(FileStatus fileStatus) {
+      return new FileStatus(
+          1L,
+          false,
+          fileStatus.getReplication(),
+          1L,
+          fileStatus.getModificationTime(),
+          fileStatus.getAccessTime(),
+          fileStatus.getPermission(),
+          fileStatus.getOwner(),
+          fileStatus.getGroup(),
+          new Path(fileStatus.getPath(), fileStatus.getPath().toString() + "/dummyDataFile"));
+    }

Review Comment:
   <img width="307" alt="image" src="https://user-images.githubusercontent.com/69539469/232584235-b3354106-a1ce-4a49-835b-3aeef349f232.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1188017981


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   If I understand it correctly, the original method here is parallel, now this changes to single process.  Is there any way to use more of the existing code, ie by passing a filter to the original bulkList methods? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169153191


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {
+
+    private final Path rootPath;
+    private final FileStatusCache fileStatusCache;
+    private final SparkSession sparkSession;
+    private final StructType userSpecifiedSchema;
+    private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+    private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles;
+    private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec;
+
+    public InMemoryLeafDirOnlyIndex(
+        SparkSession sparkSession,
+        scala.collection.immutable.Map<String, String> parameters,
+        StructType userSpecifiedSchema,
+        FileStatusCache fileStatusCache,
+        Path rootPath) {
+      super(sparkSession, parameters, Option.apply(userSpecifiedSchema), fileStatusCache);
+      this.fileStatusCache = fileStatusCache;
+      this.rootPath = rootPath;
+      this.sparkSession = sparkSession;
+      this.userSpecifiedSchema = userSpecifiedSchema;
+    }
+
+    @Override
+    public scala.collection.Seq<Path> rootPaths() {
+      return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq();
+    }
+
+    @Override
+    public void refresh() {
+      fileStatusCache.invalidateAll();
+      cachedLeafFiles = null;
+      cachedLeafDirToChildFiles = null;
+      cachedPartitionSpec = null;
+    }
+
+    @Override
+    public org.apache.spark.sql.execution.datasources.PartitionSpec partitionSpec() {
+      if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning();
+      }
+      log.trace("Partition spec: {}", cachedPartitionSpec);
+      return cachedPartitionSpec;
+    }
+
+    @Override
+    public LinkedHashMap<Path, FileStatus> leafFiles() {
+      if (cachedLeafFiles == null) {
+        try {
+          List<FileStatus> fileStatuses =
+              listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0);
+          LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>();
+          for (FileStatus fs : fileStatuses) {
+            map.put(fs.getPath(), fs);
+          }
+          cachedLeafFiles = map;
+        } catch (IOException e) {
+          throw new RuntimeException("error listing files for path=" + rootPath, e);
+        }
+      }
+      return cachedLeafFiles;
+    }
+
+    static List<FileStatus> listLeafDirs(
+        SparkSession spark, Path path, StructType partitionSpec, int level) throws IOException {
+      List<FileStatus> leafDirs = new ArrayList<>();
+      int numPartitionCols = partitionSpec.fields().length;
+      if (level < numPartitionCols) {
+        try (FileSystem fs = path.getFileSystem(spark.sparkContext().hadoopConfiguration())) {
+          List<FileStatus> dirs =
+              Stream.of(fs.listStatus(path))
+                  .filter(FileStatus::isDirectory)
+                  .collect(Collectors.toList());
+          for (FileStatus dir : dirs) {
+            // stop recursive call once we reach the expected end of partitions as per table schema
+            if (level == numPartitionCols - 1) {
+              leafDirs.add(dir);
+            } else {
+              leafDirs.addAll(listLeafDirs(spark, dir.getPath(), partitionSpec, level + 1));
+            }
+          }
+        }
+      }
+      return leafDirs;
+    }
+
+    @Override
+    public scala.collection.immutable.Map<Path, FileStatus[]> leafDirToChildrenFiles() {
+      if (cachedLeafDirToChildFiles == null) {
+        List<Tuple2<Path, FileStatus[]>> tuple2s =
+            JavaConverters.seqAsJavaList(leafFiles().values().toSeq()).stream()
+                .map(
+                    fileStatus -> {
+                      // Create an empty data file in the leaf dir.
+                      // As this index is only used to list partition directories,
+                      // we can stop listing the leaf dir to avoid unnecessary listing which can
+                      // take a while on folders with 1000s of files
+                      return new Tuple2<>(
+                          fileStatus.getPath(),
+                          new FileStatus[] {createEmptyChildDataFileStatus(fileStatus)});
+                    })
+                .collect(Collectors.toList());
+        cachedLeafDirToChildFiles =
+            (scala.collection.immutable.Map<Path, FileStatus[]>)
+                Map$.MODULE$.apply(JavaConverters.asScalaBuffer(tuple2s));
+      }
+      return cachedLeafDirToChildFiles;
+    }
+
+    private FileStatus createEmptyChildDataFileStatus(FileStatus fileStatus) {
+      return new FileStatus(
+          1L,
+          false,
+          fileStatus.getReplication(),
+          1L,
+          fileStatus.getModificationTime(),
+          fileStatus.getAccessTime(),
+          fileStatus.getPermission(),
+          fileStatus.getOwner(),
+          fileStatus.getGroup(),
+          new Path(fileStatus.getPath(), fileStatus.getPath().toString() + "/dummyDataFile"));
+    }

Review Comment:
   maybe add a comment beside boolean parameter to help with reading? https://github.com/apache/iceberg/blob/master/CONTRIBUTING.md#boolean-arguments
   ```java
         return new FileStatus(
             1L,
             false, /* is directory */
             fileStatus.getReplication(),
             1L,
             fileStatus.getModificationTime(),
             fileStatus.getAccessTime(),
             fileStatus.getPermission(),
             fileStatus.getOwner(),
             fileStatus.getGroup(),
             new Path(fileStatus.getPath(), fileStatus.getPath().toString() + "/dummyDataFile"));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169168257


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {
+
+    private final Path rootPath;
+    private final FileStatusCache fileStatusCache;
+    private final SparkSession sparkSession;
+    private final StructType userSpecifiedSchema;
+    private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+    private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles;
+    private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec;
+
+    public InMemoryLeafDirOnlyIndex(
+        SparkSession sparkSession,
+        scala.collection.immutable.Map<String, String> parameters,
+        StructType userSpecifiedSchema,
+        FileStatusCache fileStatusCache,
+        Path rootPath) {
+      super(sparkSession, parameters, Option.apply(userSpecifiedSchema), fileStatusCache);
+      this.fileStatusCache = fileStatusCache;
+      this.rootPath = rootPath;
+      this.sparkSession = sparkSession;
+      this.userSpecifiedSchema = userSpecifiedSchema;
+    }
+
+    @Override
+    public scala.collection.Seq<Path> rootPaths() {
+      return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq();
+    }
+
+    @Override
+    public void refresh() {
+      fileStatusCache.invalidateAll();
+      cachedLeafFiles = null;
+      cachedLeafDirToChildFiles = null;
+      cachedPartitionSpec = null;
+    }
+
+    @Override
+    public org.apache.spark.sql.execution.datasources.PartitionSpec partitionSpec() {
+      if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning();
+      }
+      log.trace("Partition spec: {}", cachedPartitionSpec);
+      return cachedPartitionSpec;
+    }
+
+    @Override
+    public LinkedHashMap<Path, FileStatus> leafFiles() {
+      if (cachedLeafFiles == null) {
+        try {
+          List<FileStatus> fileStatuses =
+              listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0);
+          LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>();
+          for (FileStatus fs : fileStatuses) {
+            map.put(fs.getPath(), fs);
+          }
+          cachedLeafFiles = map;
+        } catch (IOException e) {
+          throw new RuntimeException("error listing files for path=" + rootPath, e);
+        }
+      }
+      return cachedLeafFiles;
+    }
+
+    static List<FileStatus> listLeafDirs(
+        SparkSession spark, Path path, StructType partitionSpec, int level) throws IOException {
+      List<FileStatus> leafDirs = new ArrayList<>();

Review Comment:
   addressed all checkstyle



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {
+
+    private final Path rootPath;
+    private final FileStatusCache fileStatusCache;
+    private final SparkSession sparkSession;
+    private final StructType userSpecifiedSchema;
+    private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+    private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles;
+    private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec;
+
+    public InMemoryLeafDirOnlyIndex(

Review Comment:
   addressed all checkstyle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1171790580


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols
+   * from user specified schema.
+   * @param path
+   * @param partitionIndex
+   * @return
+   */
+  private def listLeafDirs(path: Path, partitionIndex: Int): mutable.LinkedHashSet[FileStatus] = {
+    val startTime = System.nanoTime()
+    val output = mutable.LinkedHashSet[FileStatus]()
+    val numPartitions = userSpecifiedSchema.fields.length
+    if (partitionIndex < numPartitions) {
+      path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+        .listStatus(path)
+        .filter(f => f.isDirectory)
+        .foreach(f => {
+          if (partitionIndex == numPartitions -1) {
+            output.add(f)
+          } else {
+            output ++= listLeafDirs(f.getPath, partitionIndex + 1)
+          }
+        })
+    }
+    logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf dirs" +
+      s" for path ${path}.")
+    output
+  }
+
+  private def createEmptyChildDataFileStatus(fs: FileStatus) =
+    new FileStatus(1L,
+      false,
+      fs.getReplication,
+      1L,
+      fs.getModificationTime,
+      fs.getAccessTime,
+      fs.getPermission,
+      fs.getOwner,
+      fs.getGroup,
+      new Path(fs.getPath, fs.getPath.toString + "/dummyDataFile"))

Review Comment:
   yes, Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169159619


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {
+
+    private final Path rootPath;
+    private final FileStatusCache fileStatusCache;
+    private final SparkSession sparkSession;
+    private final StructType userSpecifiedSchema;
+    private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+    private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles;
+    private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec;
+
+    public InMemoryLeafDirOnlyIndex(

Review Comment:
   I think this `public` can be omitted
   
   ```
   Error: eckstyle] [ERROR] /home/runner/work/iceberg/iceberg/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:1047:5: Redundant 'public' modifier. [RedundantModifier]
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {
+
+    private final Path rootPath;
+    private final FileStatusCache fileStatusCache;
+    private final SparkSession sparkSession;
+    private final StructType userSpecifiedSchema;
+    private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+    private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles;
+    private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec;
+
+    public InMemoryLeafDirOnlyIndex(
+        SparkSession sparkSession,
+        scala.collection.immutable.Map<String, String> parameters,
+        StructType userSpecifiedSchema,
+        FileStatusCache fileStatusCache,
+        Path rootPath) {
+      super(sparkSession, parameters, Option.apply(userSpecifiedSchema), fileStatusCache);
+      this.fileStatusCache = fileStatusCache;
+      this.rootPath = rootPath;
+      this.sparkSession = sparkSession;
+      this.userSpecifiedSchema = userSpecifiedSchema;
+    }
+
+    @Override
+    public scala.collection.Seq<Path> rootPaths() {
+      return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq();
+    }
+
+    @Override
+    public void refresh() {
+      fileStatusCache.invalidateAll();
+      cachedLeafFiles = null;
+      cachedLeafDirToChildFiles = null;
+      cachedPartitionSpec = null;
+    }
+
+    @Override
+    public org.apache.spark.sql.execution.datasources.PartitionSpec partitionSpec() {
+      if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning();
+      }
+      log.trace("Partition spec: {}", cachedPartitionSpec);
+      return cachedPartitionSpec;
+    }
+
+    @Override
+    public LinkedHashMap<Path, FileStatus> leafFiles() {
+      if (cachedLeafFiles == null) {
+        try {
+          List<FileStatus> fileStatuses =
+              listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0);
+          LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>();
+          for (FileStatus fs : fileStatuses) {
+            map.put(fs.getPath(), fs);
+          }
+          cachedLeafFiles = map;
+        } catch (IOException e) {
+          throw new RuntimeException("error listing files for path=" + rootPath, e);
+        }
+      }
+      return cachedLeafFiles;
+    }
+
+    static List<FileStatus> listLeafDirs(
+        SparkSession spark, Path path, StructType partitionSpec, int level) throws IOException {
+      List<FileStatus> leafDirs = new ArrayList<>();

Review Comment:
   checkstyle seem to prefer `Lists.newArrayList()` 



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {

Review Comment:
   yeah I guess I am just curious about the customization over `leafDirToChildrenFiles`, but have to admit there's no easy way to test this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169158275


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {

Review Comment:
   existing add files tests cover this. Added 2 more tests as well for other edge cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Spark 3.2: Optimized add_files procedure's listPartitions [iceberg]

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat closed pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions
URL: https://github.com/apache/iceberg/pull/7363


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1188017981


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   If I understand it correctly, the original method here is parallel (may launch Spark job), now this changes to single thread.  Is there any way to use more of the existing code, ie by passing a filter to the original bulkList methods? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1189039431


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   @szehon-ho 
   correct, original method may distribute the task to list files.
   
   However, in our implementation as we **don't** list files and _only_ list directories, we don't need parallelism as listing  just dirs will be quicker or not that many compared to listing files in each dir.
   
   As seen from benchmarking 
   
   Tested with input S3 folder with large number of files.
   **All Files in path**: 7,169,419
   **Total size**: 6.7 TB
   
   Filter on date and hour, where single partition has 41,417 files
   
   | Cluster Config  | Before |  After     |
   | ------------- | ------------- | ------------- |
   List Partitions Latency | 12M49.459S | 0.774S |
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1192623988


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols
+   * from user specified schema.
+   * @param path
+   * @param partitionIndex
+   * @return
+   */
+  private def listLeafDirs(path: Path, partitionIndex: Int): mutable.LinkedHashSet[FileStatus] = {
+    val output = mutable.LinkedHashSet[FileStatus]()

Review Comment:
   I will add that back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1171675982


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -494,6 +500,16 @@ public static boolean extensionsEnabled(SparkSession spark) {
     return extensions.contains("IcebergSparkSessionExtensions");
   }
 
+  public static boolean isPartitioned(SparkSession spark, Path tableLocation) {
+    try {
+      return Arrays.stream(
+              FileSystem.get(spark.sessionState().newHadoopConf()).listStatus(tableLocation))

Review Comment:
   yes, fixed. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1191552736


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   Thanks for benchmarking.  
   
   I'm still not entirely sure why old code is 12 min vs old code being < 1 second?   My thought is, the old code spawning a spark job?  The InMemoryFileIndex list seems to also call HadoopFsUtils.parallelListLeafFiles, which seems to do a similar thing under the hood (fs.list), as long as parallel threshold was not set.
   
   yea about code re-use, I'm not entirely sure all the Hadoop-side FS stuff.  While this may work on S3, there looks like some special handling for special Hadoop FileSystem inside HadoopFsUtils.parallelListLeFiles, which I fear we may not handle.  Hence suggestion to call existing code HadoopFsUtils.parallelListLeafFiles.  we can pass a directory filter, and also set the parallelism threshold to always disable parallelism, though I may be missing some reason why this is not the same.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1192623070


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   regarding `HadoopFsUtils.parallelListLeafFiles`, we cannot use that with filter because the filter is applied _after_ leaf files are listed, we are trying to avoid that in this PR.
         val filteredTopLevelFiles = if (filter != null) {



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1192623070


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   regarding `HadoopFsUtils.parallelListLeafFiles`, we cannot use that with filter because the filter is applied _after_ leaf files are listed, we are trying to avoid that in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169136757


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -49,9 +49,12 @@
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AddFilesProcedure extends BaseProcedure {
 
+  private static Logger LOG = LoggerFactory.getLogger(AddFilesProcedure.class);

Review Comment:
   yes, should have been `log`. Fixed thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1189040649


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   @szehon-ho 
   Regarding using existing code, I tried that and unfortunately InMemoryFileIndex constructor called refresh -> listFiles,
   due to current implementation it doesn't allow overriding them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1191552736


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   Thanks for benchmarking.  
   
   I'm still not entirely sure why old code is 12 min vs old code being < 1 second?   My thought is, the old code spawned a spark job?  The InMemoryFileIndex list seems to also call HadoopFsUtils.parallelListLeafFiles, which seems to do a similar thing under the hood (fs.list), as long as parallel threshold was not set.
   
   yea about code re-use, I'm not entirely sure all the Hadoop-side FS stuff.  While this may work on S3, there looks like some special handling for special Hadoop FileSystem inside HadoopFsUtils.parallelListLeFiles, which I fear we may not handle.  Hence suggestion to call existing code HadoopFsUtils.parallelListLeafFiles
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1194367942


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   OK makes sense.  I was looking the other day and we have custom code in DeleteFileSparkAction.listFilesRecursively.  I'm a bit behind and will need to take another look later this week, unless someone else here has more time first



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] karuppayya commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "karuppayya (via GitHub)" <gi...@apache.org>.
karuppayya commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1170629638


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########


Review Comment:
   We need this in 3.3 as well? 



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -848,16 +864,13 @@ public static List<SparkPartition> getPartitions(
             : Option.apply(
                 SparkSchemaUtil.convert(new Schema(partitionSpec.partitionType().fields())));
 
-    InMemoryFileIndex fileIndex =
-        new InMemoryFileIndex(
+    PartitioningAwareFileIndex fileIndex =

Review Comment:
   Fix the comment [here](https://github.com/apache/iceberg/pull/7363/files#diff-08b2e534245d69d61b7fd677a97107855c8cba2b51e1f8a58dffbffa5f52d73cL167)?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -170,7 +173,8 @@ private void importFileTable(
 
     if (table.spec().isUnpartitioned()) {
       Preconditions.checkArgument(
-          partitions.isEmpty(), "Cannot add partitioned files to an unpartitioned table");
+          !Spark3Util.isPartitioned(spark(), tableLocation),

Review Comment:
   `partitions.isEmpty()` will return true when there are partition predicates which filter out of partition. Hence we have have changed this condition?



##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols
+   * from user specified schema.
+   * @param path
+   * @param partitionIndex
+   * @return
+   */
+  private def listLeafDirs(path: Path, partitionIndex: Int): mutable.LinkedHashSet[FileStatus] = {
+    val startTime = System.nanoTime()
+    val output = mutable.LinkedHashSet[FileStatus]()
+    val numPartitions = userSpecifiedSchema.fields.length
+    if (partitionIndex < numPartitions) {
+      path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+        .listStatus(path)
+        .filter(f => f.isDirectory)
+        .foreach(f => {
+          if (partitionIndex == numPartitions -1) {
+            output.add(f)
+          } else {
+            output ++= listLeafDirs(f.getPath, partitionIndex + 1)
+          }
+        })
+    }
+    logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf dirs" +
+      s" for path ${path}.")
+    output
+  }
+
+  private def createEmptyChildDataFileStatus(fs: FileStatus) =
+    new FileStatus(1L,
+      false,
+      fs.getReplication,
+      1L,
+      fs.getModificationTime,
+      fs.getAccessTime,
+      fs.getPermission,
+      fs.getOwner,
+      fs.getGroup,
+      new Path(fs.getPath, fs.getPath.toString + "/dummyDataFile"))

Review Comment:
   should this just be `new Path(fs.getPath, "dummyDataFile"))`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -494,6 +500,16 @@ public static boolean extensionsEnabled(SparkSession spark) {
     return extensions.contains("IcebergSparkSessionExtensions");
   }
 
+  public static boolean isPartitioned(SparkSession spark, Path tableLocation) {
+    try {
+      return Arrays.stream(
+              FileSystem.get(spark.sessionState().newHadoopConf()).listStatus(tableLocation))

Review Comment:
   Should we use this `org.apache.hadoop.fs.FileSystem#get(java.net.URI, org.apache.hadoop.conf.Configuration)`



##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles

Review Comment:
   Should we throw an exception here?



##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols
+   * from user specified schema.
+   * @param path
+   * @param partitionIndex
+   * @return
+   */
+  private def listLeafDirs(path: Path, partitionIndex: Int): mutable.LinkedHashSet[FileStatus] = {
+    val startTime = System.nanoTime()
+    val output = mutable.LinkedHashSet[FileStatus]()
+    val numPartitions = userSpecifiedSchema.fields.length
+    if (partitionIndex < numPartitions) {
+      path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+        .listStatus(path)
+        .filter(f => f.isDirectory)
+        .foreach(f => {
+          if (partitionIndex == numPartitions -1) {
+            output.add(f)
+          } else {
+            output ++= listLeafDirs(f.getPath, partitionIndex + 1)
+          }
+        })
+    }
+    logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf dirs" +

Review Comment:
   Do we want to log this for every recursive call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#issuecomment-1513871426

   @szehon-ho @aokolnychyi 
   can you review when you get a chance. Thanks!
   
   cc @rdblue @danielcweeks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1192623070


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   regarding `HadoopFsUtils.parallelListLeafFiles`, we cannot use that with filter because the filter is applied _after_ leaf files are listed, we are trying to avoid that in this PR.
   https://github.com/apache/spark/blob/46332d985e52f76887c139f67d1471b82e85d5ca/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L239



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1192628819


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   https://github.com/apache/spark/blob/46332d985e52f76887c139f67d1471b82e85d5ca/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L174
   
   HadoopFsUtils.parallelListLeafFiles has special code only if the file system is distributed/view FS, which in our case of Add_files will not be the case, I think we should be file doing `fs.listStatus` on Hadoop fs and should handle for non s3. thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1191552736


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   Thanks for benchmarking.  
   
   I'm still not entirely sure why old code is 12 min vs old code being < 1 second?   My thought is, the old code spawning a spark job?  The InMemoryFileIndex list seems to also call HadoopFsUtils.parallelListLeafFiles, which seems to do a similar thing under the hood (fs.list), as long as parallel threshold was not set.
   
   yea about code re-use, I'm not entirely sure all the Hadoop-side FS stuff.  While this may work on S3, there looks like some special handling for special Hadoop FileSystem inside HadoopFsUtils.parallelListLeFiles, which I fear we may not handle.  Hence suggestion to call existing code HadoopFsUtils.parallelListLeafFiles
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1171702582


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles

Review Comment:
   we cannot, as this is used by partitionSpec -> inferPartitioning -> leafFiles to get the partition paths



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169157083


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -165,12 +168,16 @@ private void importFileTable(
       boolean checkDuplicateFiles,
       PartitionSpec spec) {
     // List Partitions via Spark InMemory file search interface
+    long start = System.currentTimeMillis();
     List<SparkPartition> partitions =
         Spark3Util.getPartitions(spark(), tableLocation, format, partitionFilter, spec);
+    LOG.info("found {} partitions in {}", partitions.size(), Spark3Util.duration(start));

Review Comment:
   yeah, meant to remove them. removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169132446


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -49,9 +49,12 @@
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AddFilesProcedure extends BaseProcedure {
 
+  private static Logger LOG = LoggerFactory.getLogger(AddFilesProcedure.class);

Review Comment:
   I think checkstyle is not happy about variable name, 
   ```
   Error: eckstyle] [ERROR] /home/runner/work/iceberg/iceberg/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:57:25: Name 'LOG' must match pattern '^[a-z][a-zA-Z0-9]*$'. [StaticVariableName]
   ```
   
   maybe `logger` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1171653950


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########


Review Comment:
   yes, will create a separate PR for Spark 3.3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1171790187


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -848,16 +864,13 @@ public static List<SparkPartition> getPartitions(
             : Option.apply(
                 SparkSchemaUtil.convert(new Schema(partitionSpec.partitionType().fields())));
 
-    InMemoryFileIndex fileIndex =
-        new InMemoryFileIndex(
+    PartitioningAwareFileIndex fileIndex =

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1192617980


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   @szehon-ho the old code does spawn a spark job based on the parallelism threshold, which was not met in my benchmark. Parallelism makes sense for listLeafFiles as it gets distributed to executors for each folder/partition to list the files. But in our case, as we don't _listFiles_ but only dirs, I don't think we need paralellism or spark job just to list dirs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1189039431


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   @szehon-ho 
   correct, original method may distribute the task to list files.
   
   However, in our implementation as we **don't** list files and _only_ list directories, we don't need parallelism as listing  just dirs will be quicker or not that many compared to listing files in each dir.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "huaxingao (via GitHub)" <gi...@apache.org>.
huaxingao commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1189152990


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   @szehon-ho Thanks for pinging me. Seems to me there is no easy way to reuse the existing code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1191552736


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   Thanks for benchmarking.  
   
   I'm still not entirely sure why old code is 12 min vs old code being < 1 second?   My thought is, the old code spawned a spark job?  The InMemoryFileIndex list seems to also call HadoopFsUtils.parallelListLeafFiles, which seems to do a similar thing, as long as parallel threshold was not set.
   
   yea about code re-use, I'm not entirely sure all the Hadoop-side FS stuff.  While this may work on S3, there looks like some special handling for special Hadoop FileSystem inside HadoopFsUtils.parallelListLeFiles, which I fear we may not handle.  Hence suggestion to call existing code HadoopFsUtils.parallelListLeafFiles
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1191552736


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   Thanks for benchmarking.  
   
   I'm still not entirely sure why old code is 12 min vs old code being < 1 second?   My thought is, the old code spawned a spark job?  The InMemoryFileIndex list seems to also call HadoopFsUtils.parallelListLeafFiles, which seems to do a similar thing, as long as threshold was not set.
   
   yea about code re-use, I'm not entirely sure all the Hadoop-side FS stuff.  While this may work on S3, there looks like some special handling for special Hadoop FileSystem inside HadoopFsUtils.parallelListLeFiles, which I fear we may not handle.  Hence suggestion to call existing code HadoopFsUtils.parallelListLeafFiles
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1191553921


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols
+   * from user specified schema.
+   * @param path
+   * @param partitionIndex
+   * @return
+   */
+  private def listLeafDirs(path: Path, partitionIndex: Int): mutable.LinkedHashSet[FileStatus] = {
+    val output = mutable.LinkedHashSet[FileStatus]()

Review Comment:
   We probably also want to emit similar timing log here as InMemoryFileIndex did



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169157792


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -80,18 +86,24 @@
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.execution.datasources.FileStatusCache;
-import org.apache.spark.sql.execution.datasources.InMemoryFileIndex;
 import org.apache.spark.sql.execution.datasources.PartitionDirectory;
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex;
 import org.apache.spark.sql.types.IntegerType;
 import org.apache.spark.sql.types.LongType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.Tuple2;
 import scala.collection.JavaConverters;
+import scala.collection.immutable.Map$;
 import scala.collection.immutable.Seq;
+import scala.collection.mutable.LinkedHashMap;
 
 public class Spark3Util {
 
+  private static Logger log = LoggerFactory.getLogger(Spark3Util.class);

Review Comment:
   `log` is used in the code base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169135730


##########
data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java:
##########
@@ -102,13 +104,19 @@ public static List<DataFile> listPartition(
 
       Path partition = new Path(partitionUri);
       FileSystem fs = partition.getFileSystem(conf);
-      List<FileStatus> fileStatus =
-          Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-              .filter(FileStatus::isFile)
-              .collect(Collectors.toList());
-      DataFile[] datafiles = new DataFile[fileStatus.size()];
+      FileStatus[] fileStatuses = fs.listStatus(partition, HIDDEN_PATH_FILTER);
+      List<FileStatus> files = Lists.newArrayList();

Review Comment:
   nit, maybe leave List with original name `fileStatuses` and get a new name for array to avoid all those rename change below? I think Array is only used once here



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -165,12 +168,16 @@ private void importFileTable(
       boolean checkDuplicateFiles,
       PartitionSpec spec) {
     // List Partitions via Spark InMemory file search interface
+    long start = System.currentTimeMillis();
     List<SparkPartition> partitions =
         Spark3Util.getPartitions(spark(), tableLocation, format, partitionFilter, spec);
+    LOG.info("found {} partitions in {}", partitions.size(), Spark3Util.duration(start));

Review Comment:
   I think such instrumentation is helpful when comparing performance, but do you think it's needed for all imports?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -494,6 +506,20 @@ public static boolean extensionsEnabled(SparkSession spark) {
     return extensions.contains("IcebergSparkSessionExtensions");
   }
 
+  public static boolean isPartitioned(SparkSession spark, Path tableLocation) {
+    try {
+      return Arrays.stream(
+              FileSystem.get(spark.sparkContext().hadoopConfiguration()).listStatus(tableLocation))

Review Comment:
   checkstyle favor `spark.sessionState().newHadoopConf()` over ` spark.sparkContext().hadoopConfiguration()` as session config is not lost



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -80,18 +86,24 @@
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.execution.datasources.FileStatusCache;
-import org.apache.spark.sql.execution.datasources.InMemoryFileIndex;
 import org.apache.spark.sql.execution.datasources.PartitionDirectory;
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex;
 import org.apache.spark.sql.types.IntegerType;
 import org.apache.spark.sql.types.LongType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.Tuple2;
 import scala.collection.JavaConverters;
+import scala.collection.immutable.Map$;
 import scala.collection.immutable.Seq;
+import scala.collection.mutable.LinkedHashMap;
 
 public class Spark3Util {
 
+  private static Logger log = LoggerFactory.getLogger(Spark3Util.class);

Review Comment:
   nit: maybe logger?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to
+   * only visit partition dirs using number of partition columns depth recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex {

Review Comment:
   shall we add any test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1194368682


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols
+   * from user specified schema.
+   * @param path
+   * @param partitionIndex
+   * @return
+   */
+  private def listLeafDirs(path: Path, partitionIndex: Int): mutable.LinkedHashSet[FileStatus] = {
+    val output = mutable.LinkedHashSet[FileStatus]()
+    val numPartitions = userSpecifiedSchema.fields.length
+    if (partitionIndex < numPartitions) {
+      path.getFileSystem(sparkSession.sessionState.newHadoopConf())

Review Comment:
   Is there anything better we can do than to make a newconf every time?  (ie, can this be on the class)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #7363: Spark 3.2: Optimized add_files procedure's listPartitions

Posted by "abmo-x (via GitHub)" <gi...@apache.org>.
abmo-x commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1189039431


##########
spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryPartitionPathsIndex.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import org.apache.hadoop.fs._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import scala.collection.mutable
+
+
+/**
+ * Implement our own in-memory index which will only list directories to avoid unnecessary file
+ * listings. Should ONLY be used to get partition directory paths. Uses table's schema to only
+ * visit partition dirs using number of partition columns depth recursively. Does NOT return files
+ * within leaf dir.
+ */
+class InMemoryPartitionPathsIndex(
+                         sparkSession: SparkSession,
+                         rootPath :Path,
+                         parameters: Map[String, String],
+                         userSpecifiedSchema: StructType,
+                         fileStatusCache: FileStatusCache = NoopCache,
+                         override val metadataOpsTimeNs: Option[Long] = None)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, Some(userSpecifiedSchema), fileStatusCache) {
+
+  override val rootPaths = Seq(rootPath)
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override def allFiles(): Seq[FileStatus] = cachedLeafFiles.values.toSeq
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = cachedLeafFiles
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = cachedLeafDirToChildrenFiles
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafDirs(rootPath, 0)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.map( f => (f.getPath, Array(createEmptyChildDataFileStatus(f))))
+      .toMap
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+  /**
+   * recursively lists only the partition dirs. Uses the number of partition cols

Review Comment:
   @szehon-ho 
   correct, original method may distribute the task to list files.
   
   However, in our implementation as we **don't** list files and _only_ list directories, we don't need parallelism as listing  just dirs will be quicker or not that many compared to listing files in each dir.
   
   As seen from benchmarking, was able to list and find dirs under 1 second without parallelism in a folder with 7 Million files
   
   Tested with input S3 folder with large number of files.
   **All Files in path**: 7,169,419
   **Total size**: 6.7 TB
   
   Filter on date and hour, where single partition has 41,417 files
   
   | Cluster Config  | Before |  After     |
   | ------------- | ------------- | ------------- |
   List Partitions Latency | 12M49.459S | 0.774S |
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Spark 3.2: Optimized add_files procedure's listPartitions [iceberg]

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat commented on PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#issuecomment-1855668224

   Closing this as Spark 3.2 is no longer supported in the latest version.
   
   If issue exist on other spark version, please handle there. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org