You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/05 16:30:10 UTC

[GitHub] [iceberg] deniskuzZ opened a new pull request, #6527: DRAFT: Iceberg delete files are read twice during query processing causing delays

deniskuzZ opened a new pull request, #6527:
URL: https://github.com/apache/iceberg/pull/6527

   …elays


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199817615


##########
core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java:
##########
@@ -46,4 +46,13 @@ public boolean isDeleted(long position) {
   public boolean isEmpty() {
     return roaring64Bitmap.isEmpty();
   }
+
+  @Override
+  public PositionDeleteIndex or(PositionDeleteIndex deleteIndex) {
+    if (deleteIndex instanceof BitmapPositionDeleteIndex) {
+      roaring64Bitmap.or(((BitmapPositionDeleteIndex) deleteIndex).roaring64Bitmap);
+      return this;
+    }
+    throw new IllegalArgumentException();

Review Comment:
   This should not throw an exception with no context. Please use `Preconditions` and produce a helpful error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199816744


##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -306,41 +357,20 @@ protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
   }
 
   private static class DataFileFilter<T extends StructLike> extends Filter<T> {
-    private final CharSequence dataLocation;
+    private final CharSequenceSet dataLocation;
 
-    DataFileFilter(CharSequence dataLocation) {
+    DataFileFilter(CharSequenceSet dataLocation) {
       this.dataLocation = dataLocation;
     }
 
-    @Override
-    protected boolean shouldKeep(T posDelete) {
-      return charSeqEquals(dataLocation, (CharSequence) FILENAME_ACCESSOR.get(posDelete));
+    DataFileFilter(CharSequence dataLocation) {
+      this.dataLocation = CharSequenceSet.of(ImmutableList.of(dataLocation));
     }
 
-    private boolean charSeqEquals(CharSequence s1, CharSequence s2) {
-      if (s1 == s2) {
-        return true;
-      }
-
-      int count = s1.length();
-      if (count != s2.length()) {
-        return false;
-      }
-
-      if (s1 instanceof String && s2 instanceof String && s1.hashCode() != s2.hashCode()) {
-        return false;
-      }
-
-      // File paths inside a delete file normally have more identical chars at the beginning. For
-      // example, a typical
-      // path is like "s3:/bucket/db/table/data/partition/00000-0-[uuid]-00001.parquet".
-      // The uuid is where the difference starts. So it's faster to find the first diff backward.
-      for (int i = count - 1; i >= 0; i--) {
-        if (s1.charAt(i) != s2.charAt(i)) {
-          return false;
-        }
-      }
-      return true;
+    @Override
+    @SuppressWarnings("CollectionUndefinedEquality")
+    protected boolean shouldKeep(T posDelete) {
+      return dataLocation.contains(FILENAME_ACCESSOR.get(posDelete));

Review Comment:
   Why does this remove the optimization above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200563839


##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -367,4 +389,86 @@ private static Schema fileProjection(
 
     return new Schema(columns);
   }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, FileIO io) {
+    return createPosIndexMap(fileTasks, DEFAULT_SET_FILTER_THRESHOLD, io);
+  }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, long filterThreshold, FileIO io) {
+    List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
+    if (posDeletes.isEmpty()) {
+      return ImmutableMap.of();
+    }
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < filterThreshold) {
+      CharSequenceSet filePaths =
+          CharSequenceSet.of(Iterables.transform(fileTasks, task -> task.file().path()));
+      List<CloseableIterable<Record>> deletes =
+          Lists.transform(posDeletes, row -> openPosDeletes(io, row));
+      return Deletes.toPositionIndexMap(filePaths, deletes);
+    }
+    return null;
+  }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Cache<CharSequence, Map<String, PositionDeleteIndex>> posIndexCache,
+      Iterable<FileScanTask> fileTasks,
+      FileIO io) {
+    List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
+    if (posDeletes.isEmpty()) {
+      return ImmutableMap.of();
+    }
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum()
+        < DEFAULT_SET_FILTER_THRESHOLD) {
+      CharSequenceSet filePaths =
+          CharSequenceSet.of(Iterables.transform(fileTasks, task -> task.file().path()));
+
+      Map<String, PositionDeleteIndex> posIndexMap = Maps.newConcurrentMap();
+
+      // open all of the delete files in parallel, use index to avoid reordering
+      Tasks.range(posDeletes.size())
+          .stopOnFailure()
+          .throwFailureWhenFinished()
+          .executeWith(ThreadPools.getDeleteWorkerPool())
+          .run(
+              index -> {
+                DeleteFile deleteFile = posDeletes.get(index);
+
+                Maps.filterKeys(
+                        posIndexCache.get(
+                            deleteFile.path(),
+                            func -> {
+                              LOG.debug("Cache miss: {}", deleteFile.path());
+                              Instant start = Instant.now();
+                              Map<String, PositionDeleteIndex> res =
+                                  Deletes.toPositionIndexMap(openPosDeletes(io, deleteFile));
+                              LOG.debug(
+                                  "Cache load: {}; Time taken: {} ms;",
+                                  deleteFile.path(),
+                                  Duration.between(start, Instant.now()).toMillis());
+                              return res;
+                            }),
+                        filePaths::contains)
+                    .forEach(
+                        (key, value) -> posIndexMap.merge(key, value, PositionDeleteIndex::or));
+              });
+      LOG.debug("Cache size: {}", posIndexCache.estimatedSize());
+
+      return posIndexMap;
+    }
+    return null;

Review Comment:
   it's not. there is a special handling logic in PositionalDeletes class:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200574398


##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -142,6 +147,52 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel
     }
   }
 
+  public static <T extends StructLike> Map<String, PositionDeleteIndex> toPositionIndexMap(
+      CharSequenceSet dataLocations, List<CloseableIterable<T>> deleteFiles) {
+    DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocations);
+    List<CloseableIterable<Pair<String, Long>>> positions =
+        Lists.transform(
+            deleteFiles,
+            deletes ->
+                CloseableIterable.transform(
+                    locationFilter.filter(deletes),
+                    row ->
+                        Pair.of(
+                            (String) FILENAME_ACCESSOR.get(row),
+                            (Long) POSITION_ACCESSOR.get(row))));
+
+    try (CloseableIterable<Pair<String, Long>> deletes = CloseableIterable.concat(positions)) {
+      Map<String, PositionDeleteIndex> positionDeleteIndex = Maps.newHashMap();
+      deletes.forEach(
+          entry ->
+              positionDeleteIndex
+                  .computeIfAbsent(entry.first(), f -> new BitmapPositionDeleteIndex())

Review Comment:
   i don't see any issues here as nothing extra is done when key exists:
   ````
   default V computeIfAbsent(K key,
               Function<? super K, ? extends V> mappingFunction) {
           Objects.requireNonNull(mappingFunction);
           V v;
           if ((v = get(key)) == null) {
               V newValue;
               if ((newValue = mappingFunction.apply(key)) != null) {
                   put(key, newValue);
                   return newValue;
               }
           }
   
           return v;
       }
   ```` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200567779


##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -306,41 +357,20 @@ protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
   }
 
   private static class DataFileFilter<T extends StructLike> extends Filter<T> {
-    private final CharSequence dataLocation;
+    private final CharSequenceSet dataLocation;
 
-    DataFileFilter(CharSequence dataLocation) {
+    DataFileFilter(CharSequenceSet dataLocation) {
       this.dataLocation = dataLocation;
     }
 
-    @Override
-    protected boolean shouldKeep(T posDelete) {
-      return charSeqEquals(dataLocation, (CharSequence) FILENAME_ACCESSOR.get(posDelete));
+    DataFileFilter(CharSequence dataLocation) {
+      this.dataLocation = CharSequenceSet.of(ImmutableList.of(dataLocation));
     }
 
-    private boolean charSeqEquals(CharSequence s1, CharSequence s2) {
-      if (s1 == s2) {
-        return true;
-      }
-
-      int count = s1.length();
-      if (count != s2.length()) {
-        return false;
-      }
-
-      if (s1 instanceof String && s2 instanceof String && s1.hashCode() != s2.hashCode()) {
-        return false;
-      }
-
-      // File paths inside a delete file normally have more identical chars at the beginning. For
-      // example, a typical
-      // path is like "s3:/bucket/db/table/data/partition/00000-0-[uuid]-00001.parquet".
-      // The uuid is where the difference starts. So it's faster to find the first diff backward.
-      for (int i = count - 1; i >= 0; i--) {
-        if (s1.charAt(i) != s2.charAt(i)) {
-          return false;
-        }
-      }
-      return true;
+    @Override
+    @SuppressWarnings("CollectionUndefinedEquality")
+    protected boolean shouldKeep(T posDelete) {
+      return dataLocation.contains(FILENAME_ACCESSOR.get(posDelete));

Review Comment:
   restored



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -56,6 +64,18 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  /**
+   * Return an {@link ExecutorService} that uses "delete worker" thread-pool.
+   *
+   * <p>The size of this thread-pool is controlled by the Java system property {@code
+   * iceberg.worker.delete-num-threads}.
+   *
+   * @return an {@link ExecutorService} that uses delete worker pool
+   */
+  public static ExecutorService getDeleteWorkerPool() {

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199817394


##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -56,6 +64,18 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  /**
+   * Return an {@link ExecutorService} that uses "delete worker" thread-pool.
+   *
+   * <p>The size of this thread-pool is controlled by the Java system property {@code
+   * iceberg.worker.delete-num-threads}.
+   *
+   * @return an {@link ExecutorService} that uses delete worker pool
+   */
+  public static ExecutorService getDeleteWorkerPool() {

Review Comment:
   Can you remove the changes that overlap with #6432?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200563839


##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -367,4 +389,86 @@ private static Schema fileProjection(
 
     return new Schema(columns);
   }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, FileIO io) {
+    return createPosIndexMap(fileTasks, DEFAULT_SET_FILTER_THRESHOLD, io);
+  }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, long filterThreshold, FileIO io) {
+    List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
+    if (posDeletes.isEmpty()) {
+      return ImmutableMap.of();
+    }
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < filterThreshold) {
+      CharSequenceSet filePaths =
+          CharSequenceSet.of(Iterables.transform(fileTasks, task -> task.file().path()));
+      List<CloseableIterable<Record>> deletes =
+          Lists.transform(posDeletes, row -> openPosDeletes(io, row));
+      return Deletes.toPositionIndexMap(filePaths, deletes);
+    }
+    return null;
+  }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Cache<CharSequence, Map<String, PositionDeleteIndex>> posIndexCache,
+      Iterable<FileScanTask> fileTasks,
+      FileIO io) {
+    List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
+    if (posDeletes.isEmpty()) {
+      return ImmutableMap.of();
+    }
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum()
+        < DEFAULT_SET_FILTER_THRESHOLD) {
+      CharSequenceSet filePaths =
+          CharSequenceSet.of(Iterables.transform(fileTasks, task -> task.file().path()));
+
+      Map<String, PositionDeleteIndex> posIndexMap = Maps.newConcurrentMap();
+
+      // open all of the delete files in parallel, use index to avoid reordering
+      Tasks.range(posDeletes.size())
+          .stopOnFailure()
+          .throwFailureWhenFinished()
+          .executeWith(ThreadPools.getDeleteWorkerPool())
+          .run(
+              index -> {
+                DeleteFile deleteFile = posDeletes.get(index);
+
+                Maps.filterKeys(
+                        posIndexCache.get(
+                            deleteFile.path(),
+                            func -> {
+                              LOG.debug("Cache miss: {}", deleteFile.path());
+                              Instant start = Instant.now();
+                              Map<String, PositionDeleteIndex> res =
+                                  Deletes.toPositionIndexMap(openPosDeletes(io, deleteFile));
+                              LOG.debug(
+                                  "Cache load: {}; Time taken: {} ms;",
+                                  deleteFile.path(),
+                                  Duration.between(start, Instant.now()).toMillis());
+                              return res;
+                            }),
+                        filePaths::contains)
+                    .forEach(
+                        (key, value) -> posIndexMap.merge(key, value, PositionDeleteIndex::or));
+              });
+      LOG.debug("Cache size: {}", posIndexCache.estimatedSize());
+
+      return posIndexMap;
+    }
+    return null;

Review Comment:
   it's not. there is a special handling logic in PositionalDeletes class:
   ````
       Optional<PositionDeleteIndex> positionIndex =
           Optional.ofNullable(positionIndexMap).map(cache -> cache.get(filePath));
   
       boolean skipPosDeletes = positionIndexMap != null && !positionIndex.isPresent();
   ````
   i tried to return just empty `Optional` when we can't push the delete ids into memory, however, I think that just complicated the code 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200567199


##########
spark/v3.3/build.gradle:
##########
@@ -62,6 +62,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
     implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")
 
     compileOnly "com.google.errorprone:error_prone_annotations"
+    compileOnly "com.github.ben-manes.caffeine:caffeine"

Review Comment:
   there is a change in GenericReader that could reuse positionalDeletes info between the tasks from the same split



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200569613


##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -142,6 +147,52 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel
     }
   }
 
+  public static <T extends StructLike> Map<String, PositionDeleteIndex> toPositionIndexMap(
+      CharSequenceSet dataLocations, List<CloseableIterable<T>> deleteFiles) {
+    DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocations);
+    List<CloseableIterable<Pair<String, Long>>> positions =
+        Lists.transform(
+            deleteFiles,
+            deletes ->
+                CloseableIterable.transform(
+                    locationFilter.filter(deletes),

Review Comment:
   fixed



##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -142,6 +147,52 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel
     }
   }
 
+  public static <T extends StructLike> Map<String, PositionDeleteIndex> toPositionIndexMap(
+      CharSequenceSet dataLocations, List<CloseableIterable<T>> deleteFiles) {
+    DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocations);
+    List<CloseableIterable<Pair<String, Long>>> positions =
+        Lists.transform(
+            deleteFiles,
+            deletes ->
+                CloseableIterable.transform(
+                    locationFilter.filter(deletes),
+                    row ->
+                        Pair.of(

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200575206


##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -266,13 +291,23 @@ private CloseableIterable<T> createDeleteIterable(
         : Deletes.filterDeleted(records, isDeleted, counter);
   }
 
+  static CloseableIterable<Record> openPosDeletes(FileIO io, DeleteFile file) {
+    InputFile input = io.newInputFile(file.path().toString());

Review Comment:
   fixed



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -367,4 +389,86 @@ private static Schema fileProjection(
 
     return new Schema(columns);
   }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, FileIO io) {
+    return createPosIndexMap(fileTasks, DEFAULT_SET_FILTER_THRESHOLD, io);
+  }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, long filterThreshold, FileIO io) {
+    List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
+    if (posDeletes.isEmpty()) {
+      return ImmutableMap.of();
+    }

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200567199


##########
spark/v3.3/build.gradle:
##########
@@ -62,6 +62,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
     implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")
 
     compileOnly "com.google.errorprone:error_prone_annotations"
+    compileOnly "com.github.ben-manes.caffeine:caffeine"

Review Comment:
   there is a change in GenericReader that could reuse positionalDeletes into between the tasks from the same split



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Iceberg delete files are read multiple times during query processing causing delays [iceberg]

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ closed pull request #6527: Iceberg delete files are read multiple times during query processing causing delays
URL: https://github.com/apache/iceberg/pull/6527


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199818612


##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -142,6 +147,52 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel
     }
   }
 
+  public static <T extends StructLike> Map<String, PositionDeleteIndex> toPositionIndexMap(
+      CharSequenceSet dataLocations, List<CloseableIterable<T>> deleteFiles) {
+    DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocations);
+    List<CloseableIterable<Pair<String, Long>>> positions =
+        Lists.transform(
+            deleteFiles,
+            deletes ->
+                CloseableIterable.transform(
+                    locationFilter.filter(deletes),
+                    row ->
+                        Pair.of(

Review Comment:
   There's no need to convert to `Pair` only to consume those pairs in the same function. Just use the accessors below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199819648


##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -266,13 +291,23 @@ private CloseableIterable<T> createDeleteIterable(
         : Deletes.filterDeleted(records, isDeleted, counter);
   }
 
+  static CloseableIterable<Record> openPosDeletes(FileIO io, DeleteFile file) {
+    InputFile input = io.newInputFile(file.path().toString());

Review Comment:
   This should use `getInputFile` instead of calling `io.newInputFile`. In Spark and Flink, the input files are already created in a bulk operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199820573


##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -367,4 +389,86 @@ private static Schema fileProjection(
 
     return new Schema(columns);
   }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, FileIO io) {
+    return createPosIndexMap(fileTasks, DEFAULT_SET_FILTER_THRESHOLD, io);
+  }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, long filterThreshold, FileIO io) {
+    List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
+    if (posDeletes.isEmpty()) {
+      return ImmutableMap.of();
+    }
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < filterThreshold) {
+      CharSequenceSet filePaths =
+          CharSequenceSet.of(Iterables.transform(fileTasks, task -> task.file().path()));
+      List<CloseableIterable<Record>> deletes =
+          Lists.transform(posDeletes, row -> openPosDeletes(io, row));
+      return Deletes.toPositionIndexMap(filePaths, deletes);
+    }
+    return null;
+  }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Cache<CharSequence, Map<String, PositionDeleteIndex>> posIndexCache,
+      Iterable<FileScanTask> fileTasks,
+      FileIO io) {
+    List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
+    if (posDeletes.isEmpty()) {
+      return ImmutableMap.of();
+    }
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum()
+        < DEFAULT_SET_FILTER_THRESHOLD) {
+      CharSequenceSet filePaths =
+          CharSequenceSet.of(Iterables.transform(fileTasks, task -> task.file().path()));
+
+      Map<String, PositionDeleteIndex> posIndexMap = Maps.newConcurrentMap();
+
+      // open all of the delete files in parallel, use index to avoid reordering
+      Tasks.range(posDeletes.size())
+          .stopOnFailure()
+          .throwFailureWhenFinished()
+          .executeWith(ThreadPools.getDeleteWorkerPool())
+          .run(
+              index -> {
+                DeleteFile deleteFile = posDeletes.get(index);
+
+                Maps.filterKeys(
+                        posIndexCache.get(
+                            deleteFile.path(),
+                            func -> {
+                              LOG.debug("Cache miss: {}", deleteFile.path());
+                              Instant start = Instant.now();
+                              Map<String, PositionDeleteIndex> res =
+                                  Deletes.toPositionIndexMap(openPosDeletes(io, deleteFile));
+                              LOG.debug(
+                                  "Cache load: {}; Time taken: {} ms;",
+                                  deleteFile.path(),
+                                  Duration.between(start, Instant.now()).toMillis());
+                              return res;
+                            }),
+                        filePaths::contains)
+                    .forEach(
+                        (key, value) -> posIndexMap.merge(key, value, PositionDeleteIndex::or));
+              });
+      LOG.debug("Cache size: {}", posIndexCache.estimatedSize());
+
+      return posIndexMap;
+    }
+    return null;

Review Comment:
   Looks like a correctness bug. This can't ignore deletes if there are too many.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200567199


##########
spark/v3.3/build.gradle:
##########
@@ -62,6 +62,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
     implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")
 
     compileOnly "com.google.errorprone:error_prone_annotations"
+    compileOnly "com.github.ben-manes.caffeine:caffeine"

Review Comment:
   there is a change in GenericReader that could reuse positionalDeletes info between the tasks from the same split, see https://github.com/apache/iceberg/pull/6527/files#diff-98d1b57871903c422d33d86cc7781f33b844cef31c58938218d8fcc439b12131R76-R80



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199818762


##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -142,6 +147,52 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel
     }
   }
 
+  public static <T extends StructLike> Map<String, PositionDeleteIndex> toPositionIndexMap(
+      CharSequenceSet dataLocations, List<CloseableIterable<T>> deleteFiles) {
+    DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocations);
+    List<CloseableIterable<Pair<String, Long>>> positions =
+        Lists.transform(
+            deleteFiles,
+            deletes ->
+                CloseableIterable.transform(
+                    locationFilter.filter(deletes),
+                    row ->
+                        Pair.of(
+                            (String) FILENAME_ACCESSOR.get(row),
+                            (Long) POSITION_ACCESSOR.get(row))));
+
+    try (CloseableIterable<Pair<String, Long>> deletes = CloseableIterable.concat(positions)) {
+      Map<String, PositionDeleteIndex> positionDeleteIndex = Maps.newHashMap();
+      deletes.forEach(
+          entry ->
+              positionDeleteIndex
+                  .computeIfAbsent(entry.first(), f -> new BitmapPositionDeleteIndex())

Review Comment:
   Instead of using `computeIfAbsent` on every row, this should pre-populate the map using `dataLocations`, since those are all known.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199818257


##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -142,6 +147,52 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel
     }
   }
 
+  public static <T extends StructLike> Map<String, PositionDeleteIndex> toPositionIndexMap(
+      CharSequenceSet dataLocations, List<CloseableIterable<T>> deleteFiles) {
+    DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocations);
+    List<CloseableIterable<Pair<String, Long>>> positions =
+        Lists.transform(
+            deleteFiles,
+            deletes ->
+                CloseableIterable.transform(
+                    locationFilter.filter(deletes),

Review Comment:
   Changing the filter has removed the need for having one in the first place. Instead, I think this should use `CloseabileIterable.filter` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199820340


##########
spark/v3.3/build.gradle:
##########
@@ -62,6 +62,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
     implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")
 
     compileOnly "com.google.errorprone:error_prone_annotations"
+    compileOnly "com.github.ben-manes.caffeine:caffeine"

Review Comment:
   These changes look incorrect. Why is this new compile dependency needed when there is no code change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#issuecomment-1557276374

   @rdblue, thank you for the review! I've addressed most of the comments and provided answers for others 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1200567999


##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -56,6 +64,18 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  /**
+   * Return an {@link ExecutorService} that uses "delete worker" thread-pool.
+   *
+   * <p>The size of this thread-pool is controlled by the Java system property {@code
+   * iceberg.worker.delete-num-threads}.
+   *
+   * @return an {@link ExecutorService} that uses delete worker pool
+   */
+  public static ExecutorService getDeleteWorkerPool() {

Review Comment:
   removed, however, https://github.com/apache/iceberg/pull/6432 looks abandoned 



##########
core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java:
##########
@@ -46,4 +46,13 @@ public boolean isDeleted(long position) {
   public boolean isEmpty() {
     return roaring64Bitmap.isEmpty();
   }
+
+  @Override
+  public PositionDeleteIndex or(PositionDeleteIndex deleteIndex) {
+    if (deleteIndex instanceof BitmapPositionDeleteIndex) {
+      roaring64Bitmap.or(((BitmapPositionDeleteIndex) deleteIndex).roaring64Bitmap);
+      return this;
+    }
+    throw new IllegalArgumentException();

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] deniskuzZ commented on pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#issuecomment-1567943648

   @rdblue, gentle reminder. Please take a look once you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6527: Iceberg delete files are read multiple times during query processing causing delays

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#discussion_r1199819979


##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -367,4 +389,86 @@ private static Schema fileProjection(
 
     return new Schema(columns);
   }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, FileIO io) {
+    return createPosIndexMap(fileTasks, DEFAULT_SET_FILTER_THRESHOLD, io);
+  }
+
+  public static Map<String, PositionDeleteIndex> createPosIndexMap(
+      Iterable<FileScanTask> fileTasks, long filterThreshold, FileIO io) {
+    List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
+    if (posDeletes.isEmpty()) {
+      return ImmutableMap.of();
+    }

Review Comment:
   Please follow style guidelines. Control flow blocks should be separated from the following statement by a line of whitespace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Iceberg delete files are read multiple times during query processing causing delays [iceberg]

Posted by "deniskuzZ (via GitHub)" <gi...@apache.org>.
deniskuzZ commented on PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#issuecomment-1785010901

   @aokolnychyi, could you please help with the review? thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Iceberg delete files are read multiple times during query processing causing delays [iceberg]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #6527:
URL: https://github.com/apache/iceberg/pull/6527#issuecomment-1785114341

   I believe @bryanck also ran into this, he might be interested in reviewing this as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org