You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/05 23:10:09 UTC

[incubator-iceberg] branch master updated: Close open manifests during scan planning. (#150)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dbcd5c  Close open manifests during scan planning. (#150)
0dbcd5c is described below

commit 0dbcd5cfb141ce638604645dd5112802eca0567c
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Fri Apr 5 16:10:04 2019 -0700

    Close open manifests during scan planning. (#150)
---
 .../main/java/org/apache/iceberg/Filterable.java   |   3 +-
 .../java/org/apache/iceberg/io/CloseableGroup.java |  20 ----
 .../org/apache/iceberg/io/CloseableIterable.java   | 119 +++++++++++++++++----
 .../java/org/apache/iceberg/BaseTableScan.java     |  22 ++--
 .../java/org/apache/iceberg/FilteredManifest.java  |   6 ++
 .../java/org/apache/iceberg/ManifestGroup.java     |  14 +--
 .../java/org/apache/iceberg/ManifestReader.java    |   1 +
 .../org/apache/iceberg/util/ParallelIterable.java  |  20 ++--
 8 files changed, 137 insertions(+), 68 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/Filterable.java b/api/src/main/java/org/apache/iceberg/Filterable.java
index 4eb8a50..7be5cf1 100644
--- a/api/src/main/java/org/apache/iceberg/Filterable.java
+++ b/api/src/main/java/org/apache/iceberg/Filterable.java
@@ -22,13 +22,14 @@ package org.apache.iceberg;
 import com.google.common.collect.Lists;
 import java.util.Collection;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
 
 /**
  * Methods to filter files in a snapshot or manifest when reading.
  *
  * @param <T> Java class returned by filter methods, also filterable
  */
-public interface Filterable<T extends Filterable<T>> extends Iterable<DataFile> {
+public interface Filterable<T extends Filterable<T>> extends CloseableIterable<DataFile> {
   /**
    * Selects the columns of a file manifest to read.
    * <p>
diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableGroup.java b/api/src/main/java/org/apache/iceberg/io/CloseableGroup.java
index 4eeab7b..f1cf61f 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableGroup.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableGroup.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Deque;
-import java.util.Iterator;
 
 public abstract class CloseableGroup implements Closeable {
   private final Deque<Closeable> closeables = Lists.newLinkedList();
@@ -41,23 +40,4 @@ public abstract class CloseableGroup implements Closeable {
       }
     }
   }
-
-  static class ClosingIterable<T> extends CloseableGroup implements CloseableIterable<T> {
-    private final Iterable<T> iterable;
-
-    ClosingIterable(Iterable<T> iterable, Iterable<Closeable> closeables) {
-      this.iterable = iterable;
-      if (iterable instanceof Closeable) {
-        addCloseable((Closeable) iterable);
-      }
-      for (Closeable closeable : closeables) {
-        addCloseable(closeable);
-      }
-    }
-
-    @Override
-    public Iterator<T> iterator() {
-      return iterable.iterator();
-    }
-  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
index 4baf2af..24c0198 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.io;
 
 import com.google.common.base.Preconditions;
+import org.apache.iceberg.exceptions.RuntimeIOException;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
@@ -41,34 +42,19 @@ public interface CloseableIterable<T> extends Iterable<T>, Closeable {
   }
 
   static <E> CloseableIterable<E> empty() {
-    return new CloseableIterable<E>() {
-      @Override
-      public void close() {
-      }
-
-      @Override
-      public Iterator<E> iterator() {
-        return Collections.emptyIterator();
-      }
-    };
+    return withNoopClose(Collections.emptyList());
   }
 
-  static <E> CloseableIterable<E> combine(Iterable<E> iterable, Iterable<Closeable> closeables) {
-    return new CloseableGroup.ClosingIterable<>(iterable, closeables);
-  }
-
-  static <I, O> CloseableIterable<O> wrap(CloseableIterable<I> iterable, Function<Iterable<I>, Iterable<O>> wrap) {
-    Iterable<O> wrappedIterable = wrap.apply(iterable);
-
-    return new CloseableIterable<O>() {
+  static <E> CloseableIterable<E> combine(Iterable<E> iterable, Closeable closeable) {
+    return new CloseableIterable<E>() {
       @Override
       public void close() throws IOException {
-        iterable.close();
+        closeable.close();
       }
 
       @Override
-      public Iterator<O> iterator() {
-        return wrappedIterable.iterator();
+      public Iterator<E> iterator() {
+        return iterable.iterator();
       }
     };
   }
@@ -100,4 +86,95 @@ public interface CloseableIterable<T> extends Iterable<T>, Closeable {
       }
     };
   }
+
+  static <E> CloseableIterable<E> concat(Iterable<CloseableIterable<E>> iterable) {
+    Iterator<CloseableIterable<E>> iterables = iterable.iterator();
+    if (!iterables.hasNext()) {
+      return empty();
+    } else {
+      return new ConcatCloseableIterable<>(iterable);
+    }
+  }
+
+  class ConcatCloseableIterable<E> extends CloseableGroup implements CloseableIterable<E> {
+    private final Iterable<CloseableIterable<E>> inputs;
+
+    ConcatCloseableIterable(Iterable<CloseableIterable<E>> inputs) {
+      this.inputs = inputs;
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+      ConcatCloseableIterator<E> iter = new ConcatCloseableIterator<>(inputs);
+      addCloseable(iter);
+      return iter;
+    }
+
+    private static class ConcatCloseableIterator<E> implements Iterator<E>, Closeable {
+      private final Iterator<CloseableIterable<E>> iterables;
+      private CloseableIterable<E> currentIterable = null;
+      private Iterator<E> currentIterator = null;
+      private boolean closed = false;
+
+      private ConcatCloseableIterator(Iterable<CloseableIterable<E>> inputs) {
+        this.iterables = inputs.iterator();
+        this.currentIterable = iterables.next();
+        this.currentIterator = currentIterable.iterator();
+      }
+
+      @Override
+      public boolean hasNext() {
+        if (closed) {
+          return false;
+        }
+
+        if (currentIterator.hasNext()) {
+          return true;
+        }
+
+        while (iterables.hasNext()) {
+          try {
+            currentIterable.close();
+          } catch (IOException e) {
+            throw new RuntimeIOException(e, "Failed to close iterable");
+          }
+
+          this.currentIterable = iterables.next();
+          this.currentIterator = currentIterable.iterator();
+
+          if (currentIterator.hasNext()) {
+            return true;
+          }
+        }
+
+        try {
+          currentIterable.close();
+        } catch (IOException e) {
+          throw new RuntimeIOException(e, "Failed to close iterable");
+        }
+
+        this.closed = true;
+        this.currentIterator = null;
+        this.currentIterable = null;
+
+        return false;
+      }
+
+      @Override
+      public void close() throws IOException {
+        if (!closed) {
+          currentIterable.close();
+          this.closed = true;
+          this.currentIterator = null;
+          this.currentIterable = null;
+        }
+      }
+
+      @Override
+      public E next() {
+        return currentIterator.next();
+      }
+    }
+  }
+
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 2e580d5..23f7876 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -28,14 +28,12 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import java.io.Closeable;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Function;
 import org.apache.iceberg.TableMetadata.SnapshotLogEntry;
 import org.apache.iceberg.events.Listeners;
@@ -175,30 +173,26 @@ class BaseTableScan implements TableScan {
       Iterable<ManifestFile> matchingManifests = Iterables.filter(snapshot.manifests(),
           manifest -> evalCache.getUnchecked(manifest.partitionSpecId()).eval(manifest));
 
-      ConcurrentLinkedQueue<Closeable> toClose = new ConcurrentLinkedQueue<>();
-      Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
+      Iterable<CloseableIterable<FileScanTask>> readers = Iterables.transform(
           matchingManifests,
           manifest -> {
               ManifestReader reader = ManifestReader
                   .read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
                   .caseSensitive(caseSensitive);
             PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
-            toClose.add(reader);
             String schemaString = SchemaParser.toJson(spec.schema());
             String specString = PartitionSpecParser.toJson(spec);
             ResidualEvaluator residuals = new ResidualEvaluator(spec, rowFilter, caseSensitive);
-            return Iterables.transform(
+            return CloseableIterable.transform(
                 reader.filterRows(rowFilter).select(SNAPSHOT_COLUMNS),
                 file -> new BaseFileScanTask(file, schemaString, specString, residuals)
             );
           });
 
       if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
-        return CloseableIterable.combine(
-            new ParallelIterable<>(readers, getWorkerPool()),
-            toClose);
+        return new ParallelIterable<>(readers, getWorkerPool());
       } else {
-        return CloseableIterable.combine(Iterables.concat(readers), toClose);
+        return CloseableIterable.concat(readers);
       }
 
     } else {
@@ -218,9 +212,11 @@ class BaseTableScan implements TableScan {
 
     Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(), openFileCost);
 
+    CloseableIterable<FileScanTask> splitFiles = splitFiles(splitSize);
     return CloseableIterable.transform(
-        CloseableIterable.wrap(splitFiles(splitSize), splits ->
-            new BinPacking.PackingIterable<>(splits, splitSize, lookback, weightFunc, true)),
+        CloseableIterable.combine(
+            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, weightFunc, true),
+            splitFiles),
         BaseCombinedScanTask::new);
   }
 
@@ -255,7 +251,7 @@ class BaseTableScan implements TableScan {
         .from(fileScanTasks)
         .transformAndConcat(input -> input.split(splitSize));
     // Capture manifests which can be closed after scan planning
-    return CloseableIterable.combine(splitTasks, ImmutableList.of(fileScanTasks));
+    return CloseableIterable.combine(splitTasks, fileScanTasks);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/iceberg/FilteredManifest.java b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
index d1addc0..de23b7f 100644
--- a/core/src/main/java/org/apache/iceberg/FilteredManifest.java
+++ b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 import org.apache.iceberg.ManifestEntry.Status;
@@ -129,6 +130,11 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
     }
   }
 
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
   private Evaluator evaluator() {
     if (lazyEvaluator == null) {
       if (partFilter != null) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 3ca6b54..2da4082 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -107,7 +107,6 @@ class ManifestGroup {
    */
   public CloseableIterable<ManifestEntry> entries() {
     Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter);
-    List<Closeable> toClose = Lists.newArrayList();
 
     Iterable<ManifestFile> matchingManifests = Iterables.filter(manifests,
         manifest -> evalCache.getUnchecked(manifest.partitionSpecId()).eval(manifest));
@@ -120,19 +119,20 @@ class ManifestGroup {
               manifest.addedFilesCount() + manifest.existingFilesCount() > 0);
     }
 
-    Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
+    Iterable<CloseableIterable<ManifestEntry>> readers = Iterables.transform(
         matchingManifests,
         manifest -> {
           ManifestReader reader = ManifestReader.read(
               ops.io().newInputFile(manifest.path()),
               ops.current()::spec);
           FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
-          toClose.add(reader);
-          return Iterables.filter(
-              ignoreDeleted ? filtered.liveEntries() : filtered.allEntries(),
-              entry -> evaluator.eval((GenericDataFile) entry.file()));
+          return CloseableIterable.combine(
+              Iterables.filter(
+                  ignoreDeleted ? filtered.liveEntries() : filtered.allEntries(),
+                  entry -> evaluator.eval((GenericDataFile) entry.file())),
+              reader);
         });
 
-    return CloseableIterable.combine(Iterables.concat(readers), toClose);
+    return CloseableIterable.concat(readers);
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 32fba63..0a2d87f 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.io.IOException;
diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index a2fb875..de2e061 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -22,18 +22,21 @@ package org.apache.iceberg.util;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
 
-public class ParallelIterable<T> extends CloseableGroup implements Iterable<T> {
-  private final Iterable<Iterable<T>> iterables;
+public class ParallelIterable<T> extends CloseableGroup implements CloseableIterable<T> {
+  private final Iterable<? extends Iterable<T>> iterables;
   private final ExecutorService workerPool;
 
-  public ParallelIterable(Iterable<Iterable<T>> iterables,
+  public ParallelIterable(Iterable<? extends Iterable<T>> iterables,
                           ExecutorService workerPool) {
     this.iterables = iterables;
     this.workerPool = workerPool;
@@ -53,12 +56,17 @@ public class ParallelIterable<T> extends CloseableGroup implements Iterable<T> {
     private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
     private boolean closed = false;
 
-    private ParallelIterator(Iterable<Iterable<T>> iterables,
+    private ParallelIterator(Iterable<? extends Iterable<T>> iterables,
                              ExecutorService workerPool) {
       this.tasks = Iterables.transform(iterables, iterable ->
           (Runnable) () -> {
-            for (T item : iterable) {
-              queue.add(item);
+            try (Closeable ignored = (iterable instanceof Closeable) ?
+                (Closeable) iterable : () -> {}) {
+              for (T item : iterable) {
+                queue.add(item);
+              }
+            } catch (IOException e) {
+              throw new RuntimeIOException(e, "Failed to close iterable");
             }
           }).iterator();
       this.workerPool = workerPool;