You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/05 23:10:09 UTC
[incubator-iceberg] branch master updated: Close open manifests
during scan planning. (#150)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0dbcd5c Close open manifests during scan planning. (#150)
0dbcd5c is described below
commit 0dbcd5cfb141ce638604645dd5112802eca0567c
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Fri Apr 5 16:10:04 2019 -0700
Close open manifests during scan planning. (#150)
---
.../main/java/org/apache/iceberg/Filterable.java | 3 +-
.../java/org/apache/iceberg/io/CloseableGroup.java | 20 ----
.../org/apache/iceberg/io/CloseableIterable.java | 119 +++++++++++++++++----
.../java/org/apache/iceberg/BaseTableScan.java | 22 ++--
.../java/org/apache/iceberg/FilteredManifest.java | 6 ++
.../java/org/apache/iceberg/ManifestGroup.java | 14 +--
.../java/org/apache/iceberg/ManifestReader.java | 1 +
.../org/apache/iceberg/util/ParallelIterable.java | 20 ++--
8 files changed, 137 insertions(+), 68 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/Filterable.java b/api/src/main/java/org/apache/iceberg/Filterable.java
index 4eb8a50..7be5cf1 100644
--- a/api/src/main/java/org/apache/iceberg/Filterable.java
+++ b/api/src/main/java/org/apache/iceberg/Filterable.java
@@ -22,13 +22,14 @@ package org.apache.iceberg;
import com.google.common.collect.Lists;
import java.util.Collection;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
/**
* Methods to filter files in a snapshot or manifest when reading.
*
* @param <T> Java class returned by filter methods, also filterable
*/
-public interface Filterable<T extends Filterable<T>> extends Iterable<DataFile> {
+public interface Filterable<T extends Filterable<T>> extends CloseableIterable<DataFile> {
/**
* Selects the columns of a file manifest to read.
* <p>
diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableGroup.java b/api/src/main/java/org/apache/iceberg/io/CloseableGroup.java
index 4eeab7b..f1cf61f 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableGroup.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableGroup.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.util.Deque;
-import java.util.Iterator;
public abstract class CloseableGroup implements Closeable {
private final Deque<Closeable> closeables = Lists.newLinkedList();
@@ -41,23 +40,4 @@ public abstract class CloseableGroup implements Closeable {
}
}
}
-
- static class ClosingIterable<T> extends CloseableGroup implements CloseableIterable<T> {
- private final Iterable<T> iterable;
-
- ClosingIterable(Iterable<T> iterable, Iterable<Closeable> closeables) {
- this.iterable = iterable;
- if (iterable instanceof Closeable) {
- addCloseable((Closeable) iterable);
- }
- for (Closeable closeable : closeables) {
- addCloseable(closeable);
- }
- }
-
- @Override
- public Iterator<T> iterator() {
- return iterable.iterator();
- }
- }
}
diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
index 4baf2af..24c0198 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.io;
import com.google.common.base.Preconditions;
+import org.apache.iceberg.exceptions.RuntimeIOException;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
@@ -41,34 +42,19 @@ public interface CloseableIterable<T> extends Iterable<T>, Closeable {
}
static <E> CloseableIterable<E> empty() {
- return new CloseableIterable<E>() {
- @Override
- public void close() {
- }
-
- @Override
- public Iterator<E> iterator() {
- return Collections.emptyIterator();
- }
- };
+ return withNoopClose(Collections.emptyList());
}
- static <E> CloseableIterable<E> combine(Iterable<E> iterable, Iterable<Closeable> closeables) {
- return new CloseableGroup.ClosingIterable<>(iterable, closeables);
- }
-
- static <I, O> CloseableIterable<O> wrap(CloseableIterable<I> iterable, Function<Iterable<I>, Iterable<O>> wrap) {
- Iterable<O> wrappedIterable = wrap.apply(iterable);
-
- return new CloseableIterable<O>() {
+ static <E> CloseableIterable<E> combine(Iterable<E> iterable, Closeable closeable) {
+ return new CloseableIterable<E>() {
@Override
public void close() throws IOException {
- iterable.close();
+ closeable.close();
}
@Override
- public Iterator<O> iterator() {
- return wrappedIterable.iterator();
+ public Iterator<E> iterator() {
+ return iterable.iterator();
}
};
}
@@ -100,4 +86,95 @@ public interface CloseableIterable<T> extends Iterable<T>, Closeable {
}
};
}
+
+ static <E> CloseableIterable<E> concat(Iterable<CloseableIterable<E>> iterable) {
+ Iterator<CloseableIterable<E>> iterables = iterable.iterator();
+ if (!iterables.hasNext()) {
+ return empty();
+ } else {
+ return new ConcatCloseableIterable<>(iterable);
+ }
+ }
+
+ class ConcatCloseableIterable<E> extends CloseableGroup implements CloseableIterable<E> {
+ private final Iterable<CloseableIterable<E>> inputs;
+
+ ConcatCloseableIterable(Iterable<CloseableIterable<E>> inputs) {
+ this.inputs = inputs;
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ ConcatCloseableIterator<E> iter = new ConcatCloseableIterator<>(inputs);
+ addCloseable(iter);
+ return iter;
+ }
+
+ private static class ConcatCloseableIterator<E> implements Iterator<E>, Closeable {
+ private final Iterator<CloseableIterable<E>> iterables;
+ private CloseableIterable<E> currentIterable = null;
+ private Iterator<E> currentIterator = null;
+ private boolean closed = false;
+
+ private ConcatCloseableIterator(Iterable<CloseableIterable<E>> inputs) {
+ this.iterables = inputs.iterator();
+ this.currentIterable = iterables.next();
+ this.currentIterator = currentIterable.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (closed) {
+ return false;
+ }
+
+ if (currentIterator.hasNext()) {
+ return true;
+ }
+
+ while (iterables.hasNext()) {
+ try {
+ currentIterable.close();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close iterable");
+ }
+
+ this.currentIterable = iterables.next();
+ this.currentIterator = currentIterable.iterator();
+
+ if (currentIterator.hasNext()) {
+ return true;
+ }
+ }
+
+ try {
+ currentIterable.close();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close iterable");
+ }
+
+ this.closed = true;
+ this.currentIterator = null;
+ this.currentIterable = null;
+
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ currentIterable.close();
+ this.closed = true;
+ this.currentIterator = null;
+ this.currentIterable = null;
+ }
+ }
+
+ @Override
+ public E next() {
+ return currentIterator.next();
+ }
+ }
+ }
+
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 2e580d5..23f7876 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -28,14 +28,12 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import java.io.Closeable;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import org.apache.iceberg.TableMetadata.SnapshotLogEntry;
import org.apache.iceberg.events.Listeners;
@@ -175,30 +173,26 @@ class BaseTableScan implements TableScan {
Iterable<ManifestFile> matchingManifests = Iterables.filter(snapshot.manifests(),
manifest -> evalCache.getUnchecked(manifest.partitionSpecId()).eval(manifest));
- ConcurrentLinkedQueue<Closeable> toClose = new ConcurrentLinkedQueue<>();
- Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
+ Iterable<CloseableIterable<FileScanTask>> readers = Iterables.transform(
matchingManifests,
manifest -> {
ManifestReader reader = ManifestReader
.read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
.caseSensitive(caseSensitive);
PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
- toClose.add(reader);
String schemaString = SchemaParser.toJson(spec.schema());
String specString = PartitionSpecParser.toJson(spec);
ResidualEvaluator residuals = new ResidualEvaluator(spec, rowFilter, caseSensitive);
- return Iterables.transform(
+ return CloseableIterable.transform(
reader.filterRows(rowFilter).select(SNAPSHOT_COLUMNS),
file -> new BaseFileScanTask(file, schemaString, specString, residuals)
);
});
if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
- return CloseableIterable.combine(
- new ParallelIterable<>(readers, getWorkerPool()),
- toClose);
+ return new ParallelIterable<>(readers, getWorkerPool());
} else {
- return CloseableIterable.combine(Iterables.concat(readers), toClose);
+ return CloseableIterable.concat(readers);
}
} else {
@@ -218,9 +212,11 @@ class BaseTableScan implements TableScan {
Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(), openFileCost);
+ CloseableIterable<FileScanTask> splitFiles = splitFiles(splitSize);
return CloseableIterable.transform(
- CloseableIterable.wrap(splitFiles(splitSize), splits ->
- new BinPacking.PackingIterable<>(splits, splitSize, lookback, weightFunc, true)),
+ CloseableIterable.combine(
+ new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, weightFunc, true),
+ splitFiles),
BaseCombinedScanTask::new);
}
@@ -255,7 +251,7 @@ class BaseTableScan implements TableScan {
.from(fileScanTasks)
.transformAndConcat(input -> input.split(splitSize));
// Capture manifests which can be closed after scan planning
- return CloseableIterable.combine(splitTasks, ImmutableList.of(fileScanTasks));
+ return CloseableIterable.combine(splitTasks, fileScanTasks);
}
/**
diff --git a/core/src/main/java/org/apache/iceberg/FilteredManifest.java b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
index d1addc0..de23b7f 100644
--- a/core/src/main/java/org/apache/iceberg/FilteredManifest.java
+++ b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
+import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.iceberg.ManifestEntry.Status;
@@ -129,6 +130,11 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
}
}
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
private Evaluator evaluator() {
if (lazyEvaluator == null) {
if (partFilter != null) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 3ca6b54..2da4082 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -107,7 +107,6 @@ class ManifestGroup {
*/
public CloseableIterable<ManifestEntry> entries() {
Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter);
- List<Closeable> toClose = Lists.newArrayList();
Iterable<ManifestFile> matchingManifests = Iterables.filter(manifests,
manifest -> evalCache.getUnchecked(manifest.partitionSpecId()).eval(manifest));
@@ -120,19 +119,20 @@ class ManifestGroup {
manifest.addedFilesCount() + manifest.existingFilesCount() > 0);
}
- Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
+ Iterable<CloseableIterable<ManifestEntry>> readers = Iterables.transform(
matchingManifests,
manifest -> {
ManifestReader reader = ManifestReader.read(
ops.io().newInputFile(manifest.path()),
ops.current()::spec);
FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
- toClose.add(reader);
- return Iterables.filter(
- ignoreDeleted ? filtered.liveEntries() : filtered.allEntries(),
- entry -> evaluator.eval((GenericDataFile) entry.file()));
+ return CloseableIterable.combine(
+ Iterables.filter(
+ ignoreDeleted ? filtered.liveEntries() : filtered.allEntries(),
+ entry -> evaluator.eval((GenericDataFile) entry.file())),
+ reader);
});
- return CloseableIterable.combine(Iterables.concat(readers), toClose);
+ return CloseableIterable.concat(readers);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 32fba63..0a2d87f 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -20,6 +20,7 @@
package org.apache.iceberg;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index a2fb875..de2e061 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -22,18 +22,21 @@ package org.apache.iceberg.util;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.Closeable;
+import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
-public class ParallelIterable<T> extends CloseableGroup implements Iterable<T> {
- private final Iterable<Iterable<T>> iterables;
+public class ParallelIterable<T> extends CloseableGroup implements CloseableIterable<T> {
+ private final Iterable<? extends Iterable<T>> iterables;
private final ExecutorService workerPool;
- public ParallelIterable(Iterable<Iterable<T>> iterables,
+ public ParallelIterable(Iterable<? extends Iterable<T>> iterables,
ExecutorService workerPool) {
this.iterables = iterables;
this.workerPool = workerPool;
@@ -53,12 +56,17 @@ public class ParallelIterable<T> extends CloseableGroup implements Iterable<T> {
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private boolean closed = false;
- private ParallelIterator(Iterable<Iterable<T>> iterables,
+ private ParallelIterator(Iterable<? extends Iterable<T>> iterables,
ExecutorService workerPool) {
this.tasks = Iterables.transform(iterables, iterable ->
(Runnable) () -> {
- for (T item : iterable) {
- queue.add(item);
+ try (Closeable ignored = (iterable instanceof Closeable) ?
+ (Closeable) iterable : () -> {}) {
+ for (T item : iterable) {
+ queue.add(item);
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close iterable");
}
}).iterator();
this.workerPool = workerPool;