You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/06/06 21:57:30 UTC

[incubator-iceberg] branch master updated: Add option to load column stats with data files. (#206)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new cb600c6  Add option to load column stats with data files. (#206)
cb600c6 is described below

commit cb600c6f86a3f320648f2dd02e91794eea462a17
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Jun 6 14:57:25 2019 -0700

    Add option to load column stats with data files. (#206)
    
    * Add option to load column stats with data files.
    
    * Fix style problems.
    
    * Rename slimCopy to copyWithoutStats, fix other comments.
    
    * Fix javadoc.
    
    * Fix style.
---
 api/src/main/java/org/apache/iceberg/DataFile.java |   8 ++
 .../main/java/org/apache/iceberg/TableScan.java    |   9 ++
 .../test/java/org/apache/iceberg/TestHelpers.java  |   5 +
 .../main/java/org/apache/iceberg/BaseSnapshot.java |   4 +-
 .../java/org/apache/iceberg/BaseTableScan.java     |  36 ++++--
 .../java/org/apache/iceberg/FilteredManifest.java  |  18 ++-
 .../java/org/apache/iceberg/GenericDataFile.java   |  30 +++--
 .../java/org/apache/iceberg/ManifestEntry.java     |  14 ++-
 .../java/org/apache/iceberg/ManifestReader.java    |   4 +-
 .../apache/iceberg/MergingSnapshotProducer.java    |   2 +-
 .../apache/iceberg/TestScanDataFileColumns.java    | 123 +++++++++++++++++++++
 11 files changed, 223 insertions(+), 30 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java
index fed0ac7..456082e 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -139,6 +139,14 @@ public interface DataFile {
   DataFile copy();
 
   /**
+   * Copies this {@link DataFile data file} without file stats. Manifest readers can reuse data file instances; use
+   * this method to copy data without stats when collecting files.
+   *
+   * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts
+   */
+  DataFile copyWithoutStats();
+
+  /**
    * @return List of recommended split locations, if applicable, null otherwise.
    * When available, this information is used for planning scan tasks whose boundaries
    * are determined by these offsets. The returned list must be sorted in ascending order.
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 3a9c60f..8636a94 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -76,6 +76,15 @@ public interface TableScan {
   TableScan caseSensitive(boolean caseSensitive);
 
   /**
+   * Create a new {@link TableScan} from this that loads the column stats with each data file.
+   * <p>
+   * Column stats include: value count, null value count, lower bounds, and upper bounds.
+   *
+   * @return a new scan based on this that loads column stats.
+   */
+  TableScan includeColumnStats();
+
+  /**
    * Create a new {@link TableScan} from this that will read the given data columns. This produces
    * an expected schema that includes all fields that are either selected or used by this scan's
    * filter expression.
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index d92f789..41eceb5 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -378,6 +378,11 @@ public class TestHelpers {
     }
 
     @Override
+    public DataFile copyWithoutStats() {
+      return this;
+    }
+
+    @Override
     public List<Long> splitOffsets() {
       return null;
     }
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 9a4fcf5..b5199b8 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -163,12 +163,12 @@ class BaseSnapshot implements Snapshot {
           ops.current()::spec)) {
         for (ManifestEntry add : reader.addedFiles()) {
           if (add.snapshotId() == snapshotId) {
-            adds.add(add.file().copy());
+            adds.add(add.file().copyWithoutStats());
           }
         }
         for (ManifestEntry delete : reader.deletedFiles()) {
           if (delete.snapshotId() == snapshotId) {
-            deletes.add(delete.file().copy());
+            deletes.add(delete.file().copyWithoutStats());
           }
         }
       } catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index fadde63..302aa16 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -57,11 +57,14 @@ class BaseTableScan implements TableScan {
   private static final Logger LOG = LoggerFactory.getLogger(TableScan.class);
 
   private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-  private static final List<String> SNAPSHOT_COLUMNS = ImmutableList.of(
+  private static final List<String> SCAN_COLUMNS = ImmutableList.of(
       "snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes",
-      "file_size_in_bytes", "record_count", "partition", "value_counts", "null_value_counts",
-      "lower_bounds", "upper_bounds"
+      "file_size_in_bytes", "record_count", "partition"
   );
+  private static final List<String> SCAN_WITH_STATS_COLUMNS = ImmutableList.<String>builder()
+      .addAll(SCAN_COLUMNS)
+      .add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds")
+      .build();
   private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
       SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
 
@@ -71,21 +74,24 @@ class BaseTableScan implements TableScan {
   private final Schema schema;
   private final Expression rowFilter;
   private final boolean caseSensitive;
+  private final boolean colStats;
   private final Collection<String> selectedColumns;
   private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache;
 
   BaseTableScan(TableOperations ops, Table table) {
-    this(ops, table, null, table.schema(), Expressions.alwaysTrue(), true, null);
+    this(ops, table, null, table.schema(), Expressions.alwaysTrue(), true, false, null);
   }
 
   private BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
-                        Expression rowFilter, boolean caseSensitive, Collection<String> selectedColumns) {
+                        Expression rowFilter, boolean caseSensitive, boolean colStats,
+                        Collection<String> selectedColumns) {
     this.ops = ops;
     this.table = table;
     this.snapshotId = snapshotId;
     this.schema = schema;
     this.rowFilter = rowFilter;
     this.caseSensitive = caseSensitive;
+    this.colStats = colStats;
     this.selectedColumns = selectedColumns;
     this.evalCache = Caffeine.newBuilder().build(specId -> {
       PartitionSpec spec = ops.current().spec(specId);
@@ -104,7 +110,7 @@ class BaseTableScan implements TableScan {
         "Cannot override snapshot, already set to id=%s", scanSnapshotId);
     Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s", scanSnapshotId);
-    return new BaseTableScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, selectedColumns);
+    return new BaseTableScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
   }
 
   @Override
@@ -129,23 +135,29 @@ class BaseTableScan implements TableScan {
 
   @Override
   public TableScan project(Schema projectedSchema) {
-    return new BaseTableScan(ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, selectedColumns);
+    return new BaseTableScan(
+        ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns);
   }
 
   @Override
   public TableScan caseSensitive(boolean scanCaseSensitive) {
-    return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, selectedColumns);
+    return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns);
+  }
+
+  @Override
+  public TableScan includeColumnStats() {
+    return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns);
   }
 
   @Override
   public TableScan select(Collection<String> columns) {
-    return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, columns);
+    return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns);
   }
 
   @Override
   public TableScan filter(Expression expr) {
-    return new BaseTableScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr),
-                             caseSensitive, selectedColumns);
+    return new BaseTableScan(
+        ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats, selectedColumns);
   }
 
   @Override
@@ -181,7 +193,7 @@ class BaseTableScan implements TableScan {
             String specString = PartitionSpecParser.toJson(spec);
             ResidualEvaluator residuals = new ResidualEvaluator(spec, rowFilter, caseSensitive);
             return CloseableIterable.transform(
-                reader.filterRows(rowFilter).select(SNAPSHOT_COLUMNS),
+                reader.filterRows(rowFilter).select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
                 file -> new BaseFileScanTask(file, schemaString, specString, residuals)
             );
           });
diff --git a/core/src/main/java/org/apache/iceberg/FilteredManifest.java b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
index af9ac4e..f4ce193 100644
--- a/core/src/main/java/org/apache/iceberg/FilteredManifest.java
+++ b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
@@ -22,9 +22,13 @@ package org.apache.iceberg;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
@@ -33,6 +37,9 @@ import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
 import org.apache.iceberg.expressions.Projections;
 
 public class FilteredManifest implements Filterable<FilteredManifest> {
+  private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
+      "value_counts", "null_value_counts", "lower_bounds", "upper_bounds");
+
   private final ManifestReader reader;
   private final Expression partFilter;
   private final Expression rowFilter;
@@ -118,12 +125,19 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
       Evaluator evaluator = evaluator();
       InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
 
+      // ensure stats columns are present for metrics evaluation
+      List<String> projectColumns = Lists.newArrayList(columns);
+      projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
+
+      // if no stats columns were projected, drop them
+      boolean dropStats = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
+
       return Iterators.transform(
-          Iterators.filter(reader.iterator(partFilter, columns),
+          Iterators.filter(reader.iterator(partFilter, projectColumns),
               input -> input != null &&
                   evaluator.eval(input.partition()) &&
                   metricsEvaluator.eval(input)),
-          DataFile::copy);
+          dropStats ? DataFile::copyWithoutStats : DataFile::copy);
 
     } else {
       return Iterators.transform(reader.iterator(partFilter, columns), DataFile::copy);
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index 7544808..04a1160 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -164,8 +164,9 @@ class GenericDataFile
    * Copy constructor.
    *
    * @param toCopy a generic data file to copy.
+   * @param fullCopy whether to copy all fields or to drop column-level stats
    */
-  private GenericDataFile(GenericDataFile toCopy) {
+  private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
     this.filePath = toCopy.filePath;
     this.format = toCopy.format;
     this.partitionData = toCopy.partitionData.copy();
@@ -174,12 +175,20 @@ class GenericDataFile
     this.fileSizeInBytes = toCopy.fileSizeInBytes;
     this.fileOrdinal = toCopy.fileOrdinal;
     this.sortColumns = copy(toCopy.sortColumns);
-    // TODO: support lazy conversion to/from map
-    this.columnSizes = copy(toCopy.columnSizes);
-    this.valueCounts = copy(toCopy.valueCounts);
-    this.nullValueCounts = copy(toCopy.nullValueCounts);
-    this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds));
-    this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds));
+    if (fullCopy) {
+      // TODO: support lazy conversion to/from map
+      this.columnSizes = copy(toCopy.columnSizes);
+      this.valueCounts = copy(toCopy.valueCounts);
+      this.nullValueCounts = copy(toCopy.nullValueCounts);
+      this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds));
+      this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds));
+    } else {
+      this.columnSizes = null;
+      this.valueCounts = null;
+      this.nullValueCounts = null;
+      this.lowerBounds = null;
+      this.upperBounds = null;
+    }
     this.fromProjectionPos = toCopy.fromProjectionPos;
     this.keyMetadata = toCopy.keyMetadata == null ? null : ByteBuffers.copy(toCopy.keyMetadata);
     this.splitOffsets = copy(toCopy.splitOffsets);
@@ -414,8 +423,13 @@ class GenericDataFile
   }
 
   @Override
+  public DataFile copyWithoutStats() {
+    return new GenericDataFile(this, false /* drop stats */);
+  }
+
+  @Override
   public DataFile copy() {
-    return new GenericDataFile(this);
+    return new GenericDataFile(this, true /* full copy */);
   }
 
   private static <K, V> Map<K, V> copy(Map<K, V> map) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
index dbc2784..bbea3a9 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
@@ -60,11 +60,15 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
     this.schema = AvroSchemaUtil.convert(getSchema(partitionType), "manifest_entry");
   }
 
-  private ManifestEntry(ManifestEntry toCopy) {
+  private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) {
     this.schema = toCopy.schema;
     this.status = toCopy.status;
     this.snapshotId = toCopy.snapshotId;
-    this.file = toCopy.file().copy();
+    if (fullCopy) {
+      this.file = toCopy.file().copy();
+    } else {
+      this.file = toCopy.file().copyWithoutStats();
+    }
   }
 
   ManifestEntry wrapExisting(long newSnapshotId, DataFile newFile) {
@@ -110,7 +114,11 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
   }
 
   public ManifestEntry copy() {
-    return new ManifestEntry(this);
+    return new ManifestEntry(this, true /* full copy */);
+  }
+
+  public ManifestEntry copyWithoutStats() {
+    return new ManifestEntry(this, false /* drop stats */);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 06d1057..781352e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -183,10 +183,10 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
     for (ManifestEntry entry : entries(CHANGE_COLUNNS)) {
       switch (entry.status()) {
         case ADDED:
-          adds.add(entry.copy());
+          adds.add(entry.copyWithoutStats());
           break;
         case DELETED:
-          deletes.add(entry.copy());
+          deletes.add(entry.copyWithoutStats());
           break;
         default:
       }
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index aebded7..c9a72f3 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -434,7 +434,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
             } else {
               // only add the file to deletes if it is a new delete
               // this keeps the snapshot summary accurate for non-duplicate data
-              deletedFiles.add(entry.file().copy());
+              deletedFiles.add(entry.file().copyWithoutStats());
             }
             deletedPaths.add(wrapper);
 
diff --git a/core/src/test/java/org/apache/iceberg/TestScanDataFileColumns.java b/core/src/test/java/org/apache/iceberg/TestScanDataFileColumns.java
new file mode 100644
index 0000000..30a8a68
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestScanDataFileColumns.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestScanDataFileColumns {
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      optional(2, "data", Types.StringType.get()));
+
+  private static final Configuration CONF = new Configuration();
+  private static final Tables TABLES = new HadoopTables(CONF);
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  private String tableLocation = null;
+  private Table table = null;
+
+  @Before
+  public void createTables() throws IOException {
+    File location = temp.newFolder("shared");
+    Assert.assertTrue(location.delete());
+    this.tableLocation = location.toString();
+    this.table = TABLES.create(
+        SCHEMA, PartitionSpec.unpartitioned(),
+        ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()),
+        tableLocation);
+
+    // commit the test data
+    table.newAppend()
+        .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath("file1.parquet")
+            .withFileSizeInBytes(100)
+            .withMetrics(new Metrics(3L,
+                null, // no column sizes
+                ImmutableMap.of(1, 3L), // value count
+                ImmutableMap.of(1, 0L), // null count
+                ImmutableMap.of(1, longToBuffer(0L)), // lower bounds
+                ImmutableMap.of(1, longToBuffer(2L)))) // upper bounds)
+            .build())
+        .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath("file2.parquet")
+            .withFileSizeInBytes(100)
+            .withMetrics(new Metrics(3L,
+                null, // no column sizes
+                ImmutableMap.of(1, 3L), // value count
+                ImmutableMap.of(1, 0L), // null count
+                ImmutableMap.of(1, longToBuffer(10L)), // lower bounds
+                ImmutableMap.of(1, longToBuffer(12L)))) // upper bounds)
+            .build())
+        .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath("file3.parquet")
+            .withFileSizeInBytes(100)
+            .withMetrics(new Metrics(3L,
+                null, // no column sizes
+                ImmutableMap.of(1, 3L), // value count
+                ImmutableMap.of(1, 0L), // null count
+                ImmutableMap.of(1, longToBuffer(20L)), // lower bounds
+                ImmutableMap.of(1, longToBuffer(22L)))) // upper bounds)
+            .build())
+        .commit();
+  }
+
+  @Test
+  public void testColumnStatsIgnored() {
+    // stats columns should be suppressed by default
+    for (FileScanTask fileTask : table.newScan().planFiles()) {
+      Assert.assertNull(fileTask.file().valueCounts());
+      Assert.assertNull(fileTask.file().nullValueCounts());
+      Assert.assertNull(fileTask.file().lowerBounds());
+      Assert.assertNull(fileTask.file().upperBounds());
+    }
+  }
+
+  @Test
+  public void testColumnStatsLoading() {
+    // stats columns should be suppressed by default
+    for (FileScanTask fileTask : table.newScan().includeColumnStats().planFiles()) {
+      Assert.assertNotNull(fileTask.file().valueCounts());
+      Assert.assertNotNull(fileTask.file().nullValueCounts());
+      Assert.assertNotNull(fileTask.file().lowerBounds());
+      Assert.assertNotNull(fileTask.file().upperBounds());
+    }
+  }
+
+  private static ByteBuffer longToBuffer(long value) {
+    return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
+  }
+}