You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/06/06 21:57:30 UTC
[incubator-iceberg] branch master updated: Add option to load
column stats with data files. (#206)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new cb600c6 Add option to load column stats with data files. (#206)
cb600c6 is described below
commit cb600c6f86a3f320648f2dd02e91794eea462a17
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Jun 6 14:57:25 2019 -0700
Add option to load column stats with data files. (#206)
* Add option to load column stats with data files.
* Fix style problems.
* Rename slimCopy to copyWithoutStats, fix other comments.
* Fix javadoc.
* Fix style.
---
api/src/main/java/org/apache/iceberg/DataFile.java | 8 ++
.../main/java/org/apache/iceberg/TableScan.java | 9 ++
.../test/java/org/apache/iceberg/TestHelpers.java | 5 +
.../main/java/org/apache/iceberg/BaseSnapshot.java | 4 +-
.../java/org/apache/iceberg/BaseTableScan.java | 36 ++++--
.../java/org/apache/iceberg/FilteredManifest.java | 18 ++-
.../java/org/apache/iceberg/GenericDataFile.java | 30 +++--
.../java/org/apache/iceberg/ManifestEntry.java | 14 ++-
.../java/org/apache/iceberg/ManifestReader.java | 4 +-
.../apache/iceberg/MergingSnapshotProducer.java | 2 +-
.../apache/iceberg/TestScanDataFileColumns.java | 123 +++++++++++++++++++++
11 files changed, 223 insertions(+), 30 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java
index fed0ac7..456082e 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -139,6 +139,14 @@ public interface DataFile {
DataFile copy();
/**
+ * Copies this {@link DataFile data file} without file stats. Manifest readers can reuse data file instances; use
+ * this method to copy data without stats when collecting files.
+ *
+ * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts
+ */
+ DataFile copyWithoutStats();
+
+ /**
* @return List of recommended split locations, if applicable, null otherwise.
* When available, this information is used for planning scan tasks whose boundaries
* are determined by these offsets. The returned list must be sorted in ascending order.
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 3a9c60f..8636a94 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -76,6 +76,15 @@ public interface TableScan {
TableScan caseSensitive(boolean caseSensitive);
/**
+ * Create a new {@link TableScan} from this that loads the column stats with each data file.
+ * <p>
+ * Column stats include: value count, null value count, lower bounds, and upper bounds.
+ *
+ * @return a new scan based on this that loads column stats.
+ */
+ TableScan includeColumnStats();
+
+ /**
* Create a new {@link TableScan} from this that will read the given data columns. This produces
* an expected schema that includes all fields that are either selected or used by this scan's
* filter expression.
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index d92f789..41eceb5 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -378,6 +378,11 @@ public class TestHelpers {
}
@Override
+ public DataFile copyWithoutStats() {
+ return this;
+ }
+
+ @Override
public List<Long> splitOffsets() {
return null;
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 9a4fcf5..b5199b8 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -163,12 +163,12 @@ class BaseSnapshot implements Snapshot {
ops.current()::spec)) {
for (ManifestEntry add : reader.addedFiles()) {
if (add.snapshotId() == snapshotId) {
- adds.add(add.file().copy());
+ adds.add(add.file().copyWithoutStats());
}
}
for (ManifestEntry delete : reader.deletedFiles()) {
if (delete.snapshotId() == snapshotId) {
- deletes.add(delete.file().copy());
+ deletes.add(delete.file().copyWithoutStats());
}
}
} catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index fadde63..302aa16 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -57,11 +57,14 @@ class BaseTableScan implements TableScan {
private static final Logger LOG = LoggerFactory.getLogger(TableScan.class);
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- private static final List<String> SNAPSHOT_COLUMNS = ImmutableList.of(
+ private static final List<String> SCAN_COLUMNS = ImmutableList.of(
"snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes",
- "file_size_in_bytes", "record_count", "partition", "value_counts", "null_value_counts",
- "lower_bounds", "upper_bounds"
+ "file_size_in_bytes", "record_count", "partition"
);
+ private static final List<String> SCAN_WITH_STATS_COLUMNS = ImmutableList.<String>builder()
+ .addAll(SCAN_COLUMNS)
+ .add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds")
+ .build();
private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
@@ -71,21 +74,24 @@ class BaseTableScan implements TableScan {
private final Schema schema;
private final Expression rowFilter;
private final boolean caseSensitive;
+ private final boolean colStats;
private final Collection<String> selectedColumns;
private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache;
BaseTableScan(TableOperations ops, Table table) {
- this(ops, table, null, table.schema(), Expressions.alwaysTrue(), true, null);
+ this(ops, table, null, table.schema(), Expressions.alwaysTrue(), true, false, null);
}
private BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
- Expression rowFilter, boolean caseSensitive, Collection<String> selectedColumns) {
+ Expression rowFilter, boolean caseSensitive, boolean colStats,
+ Collection<String> selectedColumns) {
this.ops = ops;
this.table = table;
this.snapshotId = snapshotId;
this.schema = schema;
this.rowFilter = rowFilter;
this.caseSensitive = caseSensitive;
+ this.colStats = colStats;
this.selectedColumns = selectedColumns;
this.evalCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = ops.current().spec(specId);
@@ -104,7 +110,7 @@ class BaseTableScan implements TableScan {
"Cannot override snapshot, already set to id=%s", scanSnapshotId);
Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s", scanSnapshotId);
- return new BaseTableScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, selectedColumns);
+ return new BaseTableScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
}
@Override
@@ -129,23 +135,29 @@ class BaseTableScan implements TableScan {
@Override
public TableScan project(Schema projectedSchema) {
- return new BaseTableScan(ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, selectedColumns);
+ return new BaseTableScan(
+ ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns);
}
@Override
public TableScan caseSensitive(boolean scanCaseSensitive) {
- return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, selectedColumns);
+ return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns);
+ }
+
+ @Override
+ public TableScan includeColumnStats() {
+ return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns);
}
@Override
public TableScan select(Collection<String> columns) {
- return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, columns);
+ return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns);
}
@Override
public TableScan filter(Expression expr) {
- return new BaseTableScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr),
- caseSensitive, selectedColumns);
+ return new BaseTableScan(
+ ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats, selectedColumns);
}
@Override
@@ -181,7 +193,7 @@ class BaseTableScan implements TableScan {
String specString = PartitionSpecParser.toJson(spec);
ResidualEvaluator residuals = new ResidualEvaluator(spec, rowFilter, caseSensitive);
return CloseableIterable.transform(
- reader.filterRows(rowFilter).select(SNAPSHOT_COLUMNS),
+ reader.filterRows(rowFilter).select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
file -> new BaseFileScanTask(file, schemaString, specString, residuals)
);
});
diff --git a/core/src/main/java/org/apache/iceberg/FilteredManifest.java b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
index af9ac4e..f4ce193 100644
--- a/core/src/main/java/org/apache/iceberg/FilteredManifest.java
+++ b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
@@ -22,9 +22,13 @@ package org.apache.iceberg;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
@@ -33,6 +37,9 @@ import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
import org.apache.iceberg.expressions.Projections;
public class FilteredManifest implements Filterable<FilteredManifest> {
+ private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
+ "value_counts", "null_value_counts", "lower_bounds", "upper_bounds");
+
private final ManifestReader reader;
private final Expression partFilter;
private final Expression rowFilter;
@@ -118,12 +125,19 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
+ // ensure stats columns are present for metrics evaluation
+ List<String> projectColumns = Lists.newArrayList(columns);
+ projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
+
+ // if no stats columns were projected, drop them
+ boolean dropStats = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
+
return Iterators.transform(
- Iterators.filter(reader.iterator(partFilter, columns),
+ Iterators.filter(reader.iterator(partFilter, projectColumns),
input -> input != null &&
evaluator.eval(input.partition()) &&
metricsEvaluator.eval(input)),
- DataFile::copy);
+ dropStats ? DataFile::copyWithoutStats : DataFile::copy);
} else {
return Iterators.transform(reader.iterator(partFilter, columns), DataFile::copy);
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index 7544808..04a1160 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -164,8 +164,9 @@ class GenericDataFile
* Copy constructor.
*
* @param toCopy a generic data file to copy.
+ * @param fullCopy whether to copy all fields or to drop column-level stats
*/
- private GenericDataFile(GenericDataFile toCopy) {
+ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
this.filePath = toCopy.filePath;
this.format = toCopy.format;
this.partitionData = toCopy.partitionData.copy();
@@ -174,12 +175,20 @@ class GenericDataFile
this.fileSizeInBytes = toCopy.fileSizeInBytes;
this.fileOrdinal = toCopy.fileOrdinal;
this.sortColumns = copy(toCopy.sortColumns);
- // TODO: support lazy conversion to/from map
- this.columnSizes = copy(toCopy.columnSizes);
- this.valueCounts = copy(toCopy.valueCounts);
- this.nullValueCounts = copy(toCopy.nullValueCounts);
- this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds));
- this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds));
+ if (fullCopy) {
+ // TODO: support lazy conversion to/from map
+ this.columnSizes = copy(toCopy.columnSizes);
+ this.valueCounts = copy(toCopy.valueCounts);
+ this.nullValueCounts = copy(toCopy.nullValueCounts);
+ this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds));
+ this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds));
+ } else {
+ this.columnSizes = null;
+ this.valueCounts = null;
+ this.nullValueCounts = null;
+ this.lowerBounds = null;
+ this.upperBounds = null;
+ }
this.fromProjectionPos = toCopy.fromProjectionPos;
this.keyMetadata = toCopy.keyMetadata == null ? null : ByteBuffers.copy(toCopy.keyMetadata);
this.splitOffsets = copy(toCopy.splitOffsets);
@@ -414,8 +423,13 @@ class GenericDataFile
}
@Override
+ public DataFile copyWithoutStats() {
+ return new GenericDataFile(this, false /* drop stats */);
+ }
+
+ @Override
public DataFile copy() {
- return new GenericDataFile(this);
+ return new GenericDataFile(this, true /* full copy */);
}
private static <K, V> Map<K, V> copy(Map<K, V> map) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
index dbc2784..bbea3a9 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
@@ -60,11 +60,15 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
this.schema = AvroSchemaUtil.convert(getSchema(partitionType), "manifest_entry");
}
- private ManifestEntry(ManifestEntry toCopy) {
+ private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) {
this.schema = toCopy.schema;
this.status = toCopy.status;
this.snapshotId = toCopy.snapshotId;
- this.file = toCopy.file().copy();
+ if (fullCopy) {
+ this.file = toCopy.file().copy();
+ } else {
+ this.file = toCopy.file().copyWithoutStats();
+ }
}
ManifestEntry wrapExisting(long newSnapshotId, DataFile newFile) {
@@ -110,7 +114,11 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
}
public ManifestEntry copy() {
- return new ManifestEntry(this);
+ return new ManifestEntry(this, true /* full copy */);
+ }
+
+ public ManifestEntry copyWithoutStats() {
+ return new ManifestEntry(this, false /* drop stats */);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 06d1057..781352e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -183,10 +183,10 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
for (ManifestEntry entry : entries(CHANGE_COLUNNS)) {
switch (entry.status()) {
case ADDED:
- adds.add(entry.copy());
+ adds.add(entry.copyWithoutStats());
break;
case DELETED:
- deletes.add(entry.copy());
+ deletes.add(entry.copyWithoutStats());
break;
default:
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index aebded7..c9a72f3 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -434,7 +434,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
- deletedFiles.add(entry.file().copy());
+ deletedFiles.add(entry.file().copyWithoutStats());
}
deletedPaths.add(wrapper);
diff --git a/core/src/test/java/org/apache/iceberg/TestScanDataFileColumns.java b/core/src/test/java/org/apache/iceberg/TestScanDataFileColumns.java
new file mode 100644
index 0000000..30a8a68
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestScanDataFileColumns.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestScanDataFileColumns {
+ private static final Schema SCHEMA = new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(2, "data", Types.StringType.get()));
+
+ private static final Configuration CONF = new Configuration();
+ private static final Tables TABLES = new HadoopTables(CONF);
+
+ @Rule
+ public final TemporaryFolder temp = new TemporaryFolder();
+
+ private String tableLocation = null;
+ private Table table = null;
+
+ @Before
+ public void createTables() throws IOException {
+ File location = temp.newFolder("shared");
+ Assert.assertTrue(location.delete());
+ this.tableLocation = location.toString();
+ this.table = TABLES.create(
+ SCHEMA, PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()),
+ tableLocation);
+
+ // commit the test data
+ table.newAppend()
+ .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("file1.parquet")
+ .withFileSizeInBytes(100)
+ .withMetrics(new Metrics(3L,
+ null, // no column sizes
+ ImmutableMap.of(1, 3L), // value count
+ ImmutableMap.of(1, 0L), // null count
+ ImmutableMap.of(1, longToBuffer(0L)), // lower bounds
+ ImmutableMap.of(1, longToBuffer(2L)))) // upper bounds)
+ .build())
+ .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("file2.parquet")
+ .withFileSizeInBytes(100)
+ .withMetrics(new Metrics(3L,
+ null, // no column sizes
+ ImmutableMap.of(1, 3L), // value count
+ ImmutableMap.of(1, 0L), // null count
+ ImmutableMap.of(1, longToBuffer(10L)), // lower bounds
+ ImmutableMap.of(1, longToBuffer(12L)))) // upper bounds)
+ .build())
+ .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("file3.parquet")
+ .withFileSizeInBytes(100)
+ .withMetrics(new Metrics(3L,
+ null, // no column sizes
+ ImmutableMap.of(1, 3L), // value count
+ ImmutableMap.of(1, 0L), // null count
+ ImmutableMap.of(1, longToBuffer(20L)), // lower bounds
+ ImmutableMap.of(1, longToBuffer(22L)))) // upper bounds)
+ .build())
+ .commit();
+ }
+
+ @Test
+ public void testColumnStatsIgnored() {
+ // stats columns should be suppressed by default
+ for (FileScanTask fileTask : table.newScan().planFiles()) {
+ Assert.assertNull(fileTask.file().valueCounts());
+ Assert.assertNull(fileTask.file().nullValueCounts());
+ Assert.assertNull(fileTask.file().lowerBounds());
+ Assert.assertNull(fileTask.file().upperBounds());
+ }
+ }
+
+ @Test
+ public void testColumnStatsLoading() {
+ // stats columns should be suppressed by default
+ for (FileScanTask fileTask : table.newScan().includeColumnStats().planFiles()) {
+ Assert.assertNotNull(fileTask.file().valueCounts());
+ Assert.assertNotNull(fileTask.file().nullValueCounts());
+ Assert.assertNotNull(fileTask.file().lowerBounds());
+ Assert.assertNotNull(fileTask.file().upperBounds());
+ }
+ }
+
+ private static ByteBuffer longToBuffer(long value) {
+ return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
+ }
+}