You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/05/23 18:23:54 UTC

[iceberg] branch master updated: Core: Metadata table code harmonization for readable_metrics (#7613)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dbdfd33a6 Core: Metadata table code harmonization for readable_metrics (#7613)
7dbdfd33a6 is described below

commit 7dbdfd33a667a721fbb21c7c7d06fec9daa30b88
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Tue May 23 11:23:48 2023 -0700

    Core: Metadata table code harmonization for readable_metrics (#7613)
---
 .../java/org/apache/iceberg/BaseEntriesTable.java  | 104 +++++++------------
 .../java/org/apache/iceberg/BaseFilesTable.java    | 112 +++++++++------------
 .../main/java/org/apache/iceberg/MetricsUtil.java  |  51 ++++++++++
 3 files changed, 136 insertions(+), 131 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
index b8e7331355..43d8a71f87 100644
--- a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
@@ -51,8 +51,8 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
     StructType partitionType = Partitioning.partitionType(table());
     Schema schema = ManifestEntry.getSchema(partitionType);
     if (partitionType.fields().size() < 1) {
-      // avoid returning an empty struct, which is not always supported. instead, drop the partition
-      // field (id 102)
+      // avoid returning an empty struct, which is not always supported.
+      // instead, drop the partition field (id 102)
       schema = TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
     }
 
@@ -133,16 +133,13 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
       Types.NestedField readableMetricsField = projection.findField(MetricsUtil.READABLE_METRICS);
 
       if (readableMetricsField == null) {
-        CloseableIterable<StructLike> entryAsStruct =
-            CloseableIterable.transform(
-                entries(fileProjection),
-                entry -> (GenericManifestEntry<? extends ContentFile<?>>) entry);
-
         StructProjection structProjection = structProjection(projection);
-        return CloseableIterable.transform(entryAsStruct, structProjection::wrap);
+
+        return CloseableIterable.transform(
+            entries(fileProjection), entry -> structProjection.wrap((StructLike) entry));
       } else {
         Schema requiredFileProjection = requiredFileProjection();
-        Schema actualProjection = removeReadableMetrics(readableMetricsField);
+        Schema actualProjection = removeReadableMetrics(projection, readableMetricsField);
         StructProjection structProjection = structProjection(actualProjection);
 
         return CloseableIterable.transform(
@@ -153,9 +150,7 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
 
     /**
      * Ensure that the underlying metrics used to populate readable metrics column are part of the
-     * file projection
-     *
-     * @return file projection with required columns to read readable metrics
+     * file projection.
      */
     private Schema requiredFileProjection() {
       Schema projectionForReadableMetrics =
@@ -166,9 +161,10 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
       return TypeUtil.join(fileProjection, projectionForReadableMetrics);
     }
 
-    private Schema removeReadableMetrics(Types.NestedField readableMetricsField) {
+    private Schema removeReadableMetrics(
+        Schema projectionSchema, Types.NestedField readableMetricsField) {
       Set<Integer> readableMetricsIds = TypeUtil.getProjectedIds(readableMetricsField.type());
-      return TypeUtil.selectNot(projection, readableMetricsIds);
+      return TypeUtil.selectNot(projectionSchema, readableMetricsIds);
     }
 
     private StructProjection structProjection(Schema projectedSchema) {
@@ -176,74 +172,48 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
       return StructProjection.create(manifestEntrySchema, projectedSchema);
     }
 
+    /**
+     * @param fileStructProjection projection to apply on the 'data_files' struct
+     * @return entries of this read task's manifest
+     */
     private CloseableIterable<? extends ManifestEntry<? extends ContentFile<?>>> entries(
-        Schema newFileProjection) {
-      return ManifestFiles.open(manifest, io, specsById).project(newFileProjection).entries();
+        Schema fileStructProjection) {
+      return ManifestFiles.open(manifest, io, specsById).project(fileStructProjection).entries();
     }
 
+    /**
+     * Given a manifest entry and its projection, append a 'readable_metrics' column that returns
+     * the entry's metrics in human-readable form.
+     *
+     * @param entry manifest entry
+     * @param structProjection projection to apply on the manifest entry
+     * @param readableMetricsField projected "readable_metrics" field
+     * @return struct representing projected manifest entry, with appended readable_metrics field
+     */
     private StructLike withReadableMetrics(
         StructProjection structProjection,
         ManifestEntry<? extends ContentFile<?>> entry,
         Types.NestedField readableMetricsField) {
-      int projectionColumnCount = projection.columns().size();
-      int metricsPosition = projection.columns().indexOf(readableMetricsField);
-
-      StructProjection entryStruct = structProjection.wrap((StructLike) entry);
+      StructProjection struct = structProjection.wrap((StructLike) entry);
+      int structSize = projection.columns().size();
 
-      StructType projectedMetricType =
-          projection.findField(MetricsUtil.READABLE_METRICS).type().asStructType();
       MetricsUtil.ReadableMetricsStruct readableMetrics =
-          MetricsUtil.readableMetricsStruct(dataTableSchema, entry.file(), projectedMetricType);
-
-      return new ManifestEntryStructWithMetrics(
-          projectionColumnCount, metricsPosition, entryStruct, readableMetrics);
-    }
-
-    @Override
-    public Iterable<FileScanTask> split(long splitSize) {
-      return ImmutableList.of(this); // don't split
-    }
-  }
-
-  static class ManifestEntryStructWithMetrics implements StructLike {
-    private final StructProjection entryAsStruct;
-    private final MetricsUtil.ReadableMetricsStruct readableMetrics;
-    private final int projectionColumnCount;
-    private final int metricsPosition;
-
-    ManifestEntryStructWithMetrics(
-        int projectionColumnCount,
-        int metricsPosition,
-        StructProjection entryAsStruct,
-        MetricsUtil.ReadableMetricsStruct readableMetrics) {
-      this.entryAsStruct = entryAsStruct;
-      this.readableMetrics = readableMetrics;
-      this.projectionColumnCount = projectionColumnCount;
-      this.metricsPosition = metricsPosition;
-    }
+          readableMetrics(entry.file(), readableMetricsField);
+      int metricsPosition = projection.columns().indexOf(readableMetricsField);
 
-    @Override
-    public int size() {
-      return projectionColumnCount;
+      return new MetricsUtil.StructWithReadableMetrics(
+          struct, structSize, readableMetrics, metricsPosition);
     }
 
-    @Override
-    public <T> T get(int pos, Class<T> javaClass) {
-      if (pos < metricsPosition) {
-        return entryAsStruct.get(pos, javaClass);
-      } else if (pos == metricsPosition) {
-        return javaClass.cast(readableMetrics);
-      } else {
-        // columnCount = fileAsStruct column count + the readable metrics field.
-        // When pos is greater than metricsPosition, the actual position of the field in
-        // fileAsStruct should be subtracted by 1.
-        return entryAsStruct.get(pos - 1, javaClass);
-      }
+    private MetricsUtil.ReadableMetricsStruct readableMetrics(
+        ContentFile<?> file, Types.NestedField readableMetricsField) {
+      StructType projectedMetricType = readableMetricsField.type().asStructType();
+      return MetricsUtil.readableMetricsStruct(dataTableSchema, file, projectedMetricType);
     }
 
     @Override
-    public <T> void set(int pos, T value) {
-      throw new UnsupportedOperationException("ManifestEntryStructWithMetrics is read only");
+    public Iterable<FileScanTask> split(long splitSize) {
+      return ImmutableList.of(this); // don't split
     }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
index 815e0f0ce3..8df32f1d76 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
@@ -49,8 +49,8 @@ abstract class BaseFilesTable extends BaseMetadataTable {
     StructType partitionType = Partitioning.partitionType(table());
     Schema schema = new Schema(DataFile.getType(partitionType).fields());
     if (partitionType.fields().size() < 1) {
-      // avoid returning an empty struct, which is not always supported. instead, drop the partition
-      // field
+      // avoid returning an empty struct, which is not always supported.
+      // instead, drop the partition field
       schema = TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
     }
 
@@ -162,21 +162,10 @@ abstract class BaseFilesTable extends BaseMetadataTable {
       if (readableMetricsField == null) {
         return CloseableIterable.transform(files(projection), file -> (StructLike) file);
       } else {
-        // Remove virtual columns from the file projection and ensure that the underlying metrics
-        // used to create those columns are part of the file projection
-        Set<Integer> readableMetricsIds = TypeUtil.getProjectedIds(readableMetricsField.type());
-        Schema fileProjection = TypeUtil.selectNot(projection, readableMetricsIds);
-        int metricsPosition = projection.columns().indexOf(readableMetricsField);
-
-        Schema projectionForReadableMetrics =
-            new Schema(
-                MetricsUtil.READABLE_METRIC_COLS.stream()
-                    .map(MetricsUtil.ReadableMetricColDefinition::originalCol)
-                    .collect(Collectors.toList()));
-
-        Schema projectionForMetrics = TypeUtil.join(fileProjection, projectionForReadableMetrics);
+
+        Schema actualProjection = projectionForReadableMetrics(projection, readableMetricsField);
         return CloseableIterable.transform(
-            files(projectionForMetrics), f -> withReadableMetrics(f, metricsPosition));
+            files(actualProjection), f -> withReadableMetrics(f, readableMetricsField));
       }
     }
 
@@ -192,66 +181,61 @@ abstract class BaseFilesTable extends BaseMetadataTable {
       }
     }
 
-    private StructLike withReadableMetrics(ContentFile<?> file, int metricsPosition) {
-      int columnCount = projection.columns().size();
-      StructType projectedMetricType =
-          projection.findField(MetricsUtil.READABLE_METRICS).type().asStructType();
+    /**
+     * Given content file metadata, append a 'readable_metrics' column that return the file's
+     * metrics in human-readable form.
+     *
+     * @file content file metadata
+     * @param readableMetricsField projected "readable_metrics" field
+     * @return struct representing content file, with appended readable_metrics field
+     */
+    private StructLike withReadableMetrics(
+        ContentFile<?> file, Types.NestedField readableMetricsField) {
+      int structSize = projection.columns().size();
       MetricsUtil.ReadableMetricsStruct readableMetrics =
-          MetricsUtil.readableMetricsStruct(dataTableSchema, file, projectedMetricType);
-      return new ContentFileStructWithMetrics(
-          columnCount, metricsPosition, (StructLike) file, readableMetrics);
-    }
+          readableMetrics(file, readableMetricsField);
+      int metricsPosition = projection.columns().indexOf(readableMetricsField);
 
-    @Override
-    public Iterable<FileScanTask> split(long splitSize) {
-      return ImmutableList.of(this); // don't split
+      return new MetricsUtil.StructWithReadableMetrics(
+          (StructLike) file, structSize, readableMetrics, metricsPosition);
     }
 
-    @VisibleForTesting
-    ManifestFile manifest() {
-      return manifest;
+    private MetricsUtil.ReadableMetricsStruct readableMetrics(
+        ContentFile<?> file, Types.NestedField readableMetricsField) {
+      StructType projectedMetricType = readableMetricsField.type().asStructType();
+      return MetricsUtil.readableMetricsStruct(dataTableSchema, file, projectedMetricType);
     }
-  }
 
-  static class ContentFileStructWithMetrics implements StructLike {
-    private final StructLike fileAsStruct;
-    private final MetricsUtil.ReadableMetricsStruct readableMetrics;
-    private final int columnCount;
-    private final int metricsPosition;
-
-    ContentFileStructWithMetrics(
-        int columnCount,
-        int metricsPosition,
-        StructLike fileAsStruct,
-        MetricsUtil.ReadableMetricsStruct readableMetrics) {
-      this.fileAsStruct = fileAsStruct;
-      this.readableMetrics = readableMetrics;
-      this.columnCount = columnCount;
-      this.metricsPosition = metricsPosition;
-    }
+    /**
+     * Create a projection on content files metadata by removing virtual 'readable_column' and
+     * ensuring that the underlying metrics used to create that column are part of the final
+     * projection.
+     *
+     * @param requestedProjection requested projection
+     * @param readableMetricsField readable_metrics field
+     * @return actual projection to be used
+     */
+    private Schema projectionForReadableMetrics(
+        Schema requestedProjection, Types.NestedField readableMetricsField) {
+      Set<Integer> readableMetricsIds = TypeUtil.getProjectedIds(readableMetricsField.type());
+      Schema realProjection = TypeUtil.selectNot(requestedProjection, readableMetricsIds);
 
-    @Override
-    public int size() {
-      return columnCount;
+      Schema requiredMetricsColumns =
+          new Schema(
+              MetricsUtil.READABLE_METRIC_COLS.stream()
+                  .map(MetricsUtil.ReadableMetricColDefinition::originalCol)
+                  .collect(Collectors.toList()));
+      return TypeUtil.join(realProjection, requiredMetricsColumns);
     }
 
     @Override
-    public <T> T get(int pos, Class<T> javaClass) {
-      if (pos < metricsPosition) {
-        return fileAsStruct.get(pos, javaClass);
-      } else if (pos == metricsPosition) {
-        return javaClass.cast(readableMetrics);
-      } else {
-        // columnCount = fileAsStruct column count + the readable metrics field.
-        // When pos is greater than metricsPosition, the actual position of the field in
-        // fileAsStruct should be subtracted by 1.
-        return fileAsStruct.get(pos - 1, javaClass);
-      }
+    public Iterable<FileScanTask> split(long splitSize) {
+      return ImmutableList.of(this); // don't split
     }
 
-    @Override
-    public <T> void set(int pos, T value) {
-      throw new UnsupportedOperationException("ContentFileStructWithMetrics is read only");
+    @VisibleForTesting
+    ManifestFile manifest() {
+      return manifest;
     }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
index cde9bcb4a0..b631af0fc5 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
@@ -356,4 +356,55 @@ public class MetricsUtil {
     return new ReadableMetricsStruct(
         colMetrics.stream().map(m -> (StructLike) m).collect(Collectors.toList()));
   }
+
+  /** Custom struct that returns a 'readable_metric' column at a specific position */
+  static class StructWithReadableMetrics implements StructLike {
+    private final StructLike struct;
+    private final MetricsUtil.ReadableMetricsStruct readableMetrics;
+    private final int projectionColumnCount;
+    private final int metricsPosition;
+
+    /**
+     * Constructs a struct with readable metrics column
+     *
+     * @param struct struct on which to append 'readable_metrics' struct
+     * @param structSize total number of struct columns, including 'readable_metrics' column
+     * @param readableMetrics struct of 'readable_metrics'
+     * @param metricsPosition position of 'readable_metrics' column
+     */
+    StructWithReadableMetrics(
+        StructLike struct,
+        int structSize,
+        MetricsUtil.ReadableMetricsStruct readableMetrics,
+        int metricsPosition) {
+      this.struct = struct;
+      this.readableMetrics = readableMetrics;
+      this.projectionColumnCount = structSize;
+      this.metricsPosition = metricsPosition;
+    }
+
+    @Override
+    public int size() {
+      return projectionColumnCount;
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      if (pos < metricsPosition) {
+        return struct.get(pos, javaClass);
+      } else if (pos == metricsPosition) {
+        return javaClass.cast(readableMetrics);
+      } else {
+        // columnCount = fileAsStruct column count + the readable metrics field.
+        // When pos is greater than metricsPosition, the actual position of the field in
+        // fileAsStruct should be subtracted by 1.
+        return struct.get(pos - 1, javaClass);
+      }
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      throw new UnsupportedOperationException("StructWithReadableMetrics is read only");
+    }
+  }
 }