You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/05/23 18:23:54 UTC
[iceberg] branch master updated: Core: Metadata table code harmonization for readable_metrics (#7613)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7dbdfd33a6 Core: Metadata table code harmonization for readable_metrics (#7613)
7dbdfd33a6 is described below
commit 7dbdfd33a667a721fbb21c7c7d06fec9daa30b88
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Tue May 23 11:23:48 2023 -0700
Core: Metadata table code harmonization for readable_metrics (#7613)
---
.../java/org/apache/iceberg/BaseEntriesTable.java | 104 +++++++------------
.../java/org/apache/iceberg/BaseFilesTable.java | 112 +++++++++------------
.../main/java/org/apache/iceberg/MetricsUtil.java | 51 ++++++++++
3 files changed, 136 insertions(+), 131 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
index b8e7331355..43d8a71f87 100644
--- a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
@@ -51,8 +51,8 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
StructType partitionType = Partitioning.partitionType(table());
Schema schema = ManifestEntry.getSchema(partitionType);
if (partitionType.fields().size() < 1) {
- // avoid returning an empty struct, which is not always supported. instead, drop the partition
- // field (id 102)
+ // avoid returning an empty struct, which is not always supported.
+ // instead, drop the partition field (id 102)
schema = TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
}
@@ -133,16 +133,13 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
Types.NestedField readableMetricsField = projection.findField(MetricsUtil.READABLE_METRICS);
if (readableMetricsField == null) {
- CloseableIterable<StructLike> entryAsStruct =
- CloseableIterable.transform(
- entries(fileProjection),
- entry -> (GenericManifestEntry<? extends ContentFile<?>>) entry);
-
StructProjection structProjection = structProjection(projection);
- return CloseableIterable.transform(entryAsStruct, structProjection::wrap);
+
+ return CloseableIterable.transform(
+ entries(fileProjection), entry -> structProjection.wrap((StructLike) entry));
} else {
Schema requiredFileProjection = requiredFileProjection();
- Schema actualProjection = removeReadableMetrics(readableMetricsField);
+ Schema actualProjection = removeReadableMetrics(projection, readableMetricsField);
StructProjection structProjection = structProjection(actualProjection);
return CloseableIterable.transform(
@@ -153,9 +150,7 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
/**
* Ensure that the underlying metrics used to populate readable metrics column are part of the
- * file projection
- *
- * @return file projection with required columns to read readable metrics
+ * file projection.
*/
private Schema requiredFileProjection() {
Schema projectionForReadableMetrics =
@@ -166,9 +161,10 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
return TypeUtil.join(fileProjection, projectionForReadableMetrics);
}
- private Schema removeReadableMetrics(Types.NestedField readableMetricsField) {
+ private Schema removeReadableMetrics(
+ Schema projectionSchema, Types.NestedField readableMetricsField) {
Set<Integer> readableMetricsIds = TypeUtil.getProjectedIds(readableMetricsField.type());
- return TypeUtil.selectNot(projection, readableMetricsIds);
+ return TypeUtil.selectNot(projectionSchema, readableMetricsIds);
}
private StructProjection structProjection(Schema projectedSchema) {
@@ -176,74 +172,48 @@ abstract class BaseEntriesTable extends BaseMetadataTable {
return StructProjection.create(manifestEntrySchema, projectedSchema);
}
+ /**
+ * @param fileStructProjection projection to apply on the 'data_files' struct
+ * @return entries of this read task's manifest
+ */
private CloseableIterable<? extends ManifestEntry<? extends ContentFile<?>>> entries(
- Schema newFileProjection) {
- return ManifestFiles.open(manifest, io, specsById).project(newFileProjection).entries();
+ Schema fileStructProjection) {
+ return ManifestFiles.open(manifest, io, specsById).project(fileStructProjection).entries();
}
+ /**
+ * Given a manifest entry and its projection, append a 'readable_metrics' column that returns
+ * the entry's metrics in human-readable form.
+ *
+ * @param entry manifest entry
+ * @param structProjection projection to apply on the manifest entry
+ * @param readableMetricsField projected "readable_metrics" field
+ * @return struct representing projected manifest entry, with appended readable_metrics field
+ */
private StructLike withReadableMetrics(
StructProjection structProjection,
ManifestEntry<? extends ContentFile<?>> entry,
Types.NestedField readableMetricsField) {
- int projectionColumnCount = projection.columns().size();
- int metricsPosition = projection.columns().indexOf(readableMetricsField);
-
- StructProjection entryStruct = structProjection.wrap((StructLike) entry);
+ StructProjection struct = structProjection.wrap((StructLike) entry);
+ int structSize = projection.columns().size();
- StructType projectedMetricType =
- projection.findField(MetricsUtil.READABLE_METRICS).type().asStructType();
MetricsUtil.ReadableMetricsStruct readableMetrics =
- MetricsUtil.readableMetricsStruct(dataTableSchema, entry.file(), projectedMetricType);
-
- return new ManifestEntryStructWithMetrics(
- projectionColumnCount, metricsPosition, entryStruct, readableMetrics);
- }
-
- @Override
- public Iterable<FileScanTask> split(long splitSize) {
- return ImmutableList.of(this); // don't split
- }
- }
-
- static class ManifestEntryStructWithMetrics implements StructLike {
- private final StructProjection entryAsStruct;
- private final MetricsUtil.ReadableMetricsStruct readableMetrics;
- private final int projectionColumnCount;
- private final int metricsPosition;
-
- ManifestEntryStructWithMetrics(
- int projectionColumnCount,
- int metricsPosition,
- StructProjection entryAsStruct,
- MetricsUtil.ReadableMetricsStruct readableMetrics) {
- this.entryAsStruct = entryAsStruct;
- this.readableMetrics = readableMetrics;
- this.projectionColumnCount = projectionColumnCount;
- this.metricsPosition = metricsPosition;
- }
+ readableMetrics(entry.file(), readableMetricsField);
+ int metricsPosition = projection.columns().indexOf(readableMetricsField);
- @Override
- public int size() {
- return projectionColumnCount;
+ return new MetricsUtil.StructWithReadableMetrics(
+ struct, structSize, readableMetrics, metricsPosition);
}
- @Override
- public <T> T get(int pos, Class<T> javaClass) {
- if (pos < metricsPosition) {
- return entryAsStruct.get(pos, javaClass);
- } else if (pos == metricsPosition) {
- return javaClass.cast(readableMetrics);
- } else {
- // columnCount = fileAsStruct column count + the readable metrics field.
- // When pos is greater than metricsPosition, the actual position of the field in
- // fileAsStruct should be subtracted by 1.
- return entryAsStruct.get(pos - 1, javaClass);
- }
+ private MetricsUtil.ReadableMetricsStruct readableMetrics(
+ ContentFile<?> file, Types.NestedField readableMetricsField) {
+ StructType projectedMetricType = readableMetricsField.type().asStructType();
+ return MetricsUtil.readableMetricsStruct(dataTableSchema, file, projectedMetricType);
}
@Override
- public <T> void set(int pos, T value) {
- throw new UnsupportedOperationException("ManifestEntryStructWithMetrics is read only");
+ public Iterable<FileScanTask> split(long splitSize) {
+ return ImmutableList.of(this); // don't split
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
index 815e0f0ce3..8df32f1d76 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
@@ -49,8 +49,8 @@ abstract class BaseFilesTable extends BaseMetadataTable {
StructType partitionType = Partitioning.partitionType(table());
Schema schema = new Schema(DataFile.getType(partitionType).fields());
if (partitionType.fields().size() < 1) {
- // avoid returning an empty struct, which is not always supported. instead, drop the partition
- // field
+ // avoid returning an empty struct, which is not always supported.
+ // instead, drop the partition field
schema = TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
}
@@ -162,21 +162,10 @@ abstract class BaseFilesTable extends BaseMetadataTable {
if (readableMetricsField == null) {
return CloseableIterable.transform(files(projection), file -> (StructLike) file);
} else {
- // Remove virtual columns from the file projection and ensure that the underlying metrics
- // used to create those columns are part of the file projection
- Set<Integer> readableMetricsIds = TypeUtil.getProjectedIds(readableMetricsField.type());
- Schema fileProjection = TypeUtil.selectNot(projection, readableMetricsIds);
- int metricsPosition = projection.columns().indexOf(readableMetricsField);
-
- Schema projectionForReadableMetrics =
- new Schema(
- MetricsUtil.READABLE_METRIC_COLS.stream()
- .map(MetricsUtil.ReadableMetricColDefinition::originalCol)
- .collect(Collectors.toList()));
-
- Schema projectionForMetrics = TypeUtil.join(fileProjection, projectionForReadableMetrics);
+
+ Schema actualProjection = projectionForReadableMetrics(projection, readableMetricsField);
return CloseableIterable.transform(
- files(projectionForMetrics), f -> withReadableMetrics(f, metricsPosition));
+ files(actualProjection), f -> withReadableMetrics(f, readableMetricsField));
}
}
@@ -192,66 +181,61 @@ abstract class BaseFilesTable extends BaseMetadataTable {
}
}
- private StructLike withReadableMetrics(ContentFile<?> file, int metricsPosition) {
- int columnCount = projection.columns().size();
- StructType projectedMetricType =
- projection.findField(MetricsUtil.READABLE_METRICS).type().asStructType();
+ /**
+ * Given content file metadata, append a 'readable_metrics' column that return the file's
+ * metrics in human-readable form.
+ *
+ * @file content file metadata
+ * @param readableMetricsField projected "readable_metrics" field
+ * @return struct representing content file, with appended readable_metrics field
+ */
+ private StructLike withReadableMetrics(
+ ContentFile<?> file, Types.NestedField readableMetricsField) {
+ int structSize = projection.columns().size();
MetricsUtil.ReadableMetricsStruct readableMetrics =
- MetricsUtil.readableMetricsStruct(dataTableSchema, file, projectedMetricType);
- return new ContentFileStructWithMetrics(
- columnCount, metricsPosition, (StructLike) file, readableMetrics);
- }
+ readableMetrics(file, readableMetricsField);
+ int metricsPosition = projection.columns().indexOf(readableMetricsField);
- @Override
- public Iterable<FileScanTask> split(long splitSize) {
- return ImmutableList.of(this); // don't split
+ return new MetricsUtil.StructWithReadableMetrics(
+ (StructLike) file, structSize, readableMetrics, metricsPosition);
}
- @VisibleForTesting
- ManifestFile manifest() {
- return manifest;
+ private MetricsUtil.ReadableMetricsStruct readableMetrics(
+ ContentFile<?> file, Types.NestedField readableMetricsField) {
+ StructType projectedMetricType = readableMetricsField.type().asStructType();
+ return MetricsUtil.readableMetricsStruct(dataTableSchema, file, projectedMetricType);
}
- }
- static class ContentFileStructWithMetrics implements StructLike {
- private final StructLike fileAsStruct;
- private final MetricsUtil.ReadableMetricsStruct readableMetrics;
- private final int columnCount;
- private final int metricsPosition;
-
- ContentFileStructWithMetrics(
- int columnCount,
- int metricsPosition,
- StructLike fileAsStruct,
- MetricsUtil.ReadableMetricsStruct readableMetrics) {
- this.fileAsStruct = fileAsStruct;
- this.readableMetrics = readableMetrics;
- this.columnCount = columnCount;
- this.metricsPosition = metricsPosition;
- }
+ /**
+ * Create a projection on content files metadata by removing virtual 'readable_column' and
+ * ensuring that the underlying metrics used to create that column are part of the final
+ * projection.
+ *
+ * @param requestedProjection requested projection
+ * @param readableMetricsField readable_metrics field
+ * @return actual projection to be used
+ */
+ private Schema projectionForReadableMetrics(
+ Schema requestedProjection, Types.NestedField readableMetricsField) {
+ Set<Integer> readableMetricsIds = TypeUtil.getProjectedIds(readableMetricsField.type());
+ Schema realProjection = TypeUtil.selectNot(requestedProjection, readableMetricsIds);
- @Override
- public int size() {
- return columnCount;
+ Schema requiredMetricsColumns =
+ new Schema(
+ MetricsUtil.READABLE_METRIC_COLS.stream()
+ .map(MetricsUtil.ReadableMetricColDefinition::originalCol)
+ .collect(Collectors.toList()));
+ return TypeUtil.join(realProjection, requiredMetricsColumns);
}
@Override
- public <T> T get(int pos, Class<T> javaClass) {
- if (pos < metricsPosition) {
- return fileAsStruct.get(pos, javaClass);
- } else if (pos == metricsPosition) {
- return javaClass.cast(readableMetrics);
- } else {
- // columnCount = fileAsStruct column count + the readable metrics field.
- // When pos is greater than metricsPosition, the actual position of the field in
- // fileAsStruct should be subtracted by 1.
- return fileAsStruct.get(pos - 1, javaClass);
- }
+ public Iterable<FileScanTask> split(long splitSize) {
+ return ImmutableList.of(this); // don't split
}
- @Override
- public <T> void set(int pos, T value) {
- throw new UnsupportedOperationException("ContentFileStructWithMetrics is read only");
+ @VisibleForTesting
+ ManifestFile manifest() {
+ return manifest;
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
index cde9bcb4a0..b631af0fc5 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
@@ -356,4 +356,55 @@ public class MetricsUtil {
return new ReadableMetricsStruct(
colMetrics.stream().map(m -> (StructLike) m).collect(Collectors.toList()));
}
+
+ /** Custom struct that returns a 'readable_metric' column at a specific position */
+ static class StructWithReadableMetrics implements StructLike {
+ private final StructLike struct;
+ private final MetricsUtil.ReadableMetricsStruct readableMetrics;
+ private final int projectionColumnCount;
+ private final int metricsPosition;
+
+ /**
+ * Constructs a struct with readable metrics column
+ *
+ * @param struct struct on which to append 'readable_metrics' struct
+ * @param structSize total number of struct columns, including 'readable_metrics' column
+ * @param readableMetrics struct of 'readable_metrics'
+ * @param metricsPosition position of 'readable_metrics' column
+ */
+ StructWithReadableMetrics(
+ StructLike struct,
+ int structSize,
+ MetricsUtil.ReadableMetricsStruct readableMetrics,
+ int metricsPosition) {
+ this.struct = struct;
+ this.readableMetrics = readableMetrics;
+ this.projectionColumnCount = structSize;
+ this.metricsPosition = metricsPosition;
+ }
+
+ @Override
+ public int size() {
+ return projectionColumnCount;
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ if (pos < metricsPosition) {
+ return struct.get(pos, javaClass);
+ } else if (pos == metricsPosition) {
+ return javaClass.cast(readableMetrics);
+ } else {
+ // columnCount = fileAsStruct column count + the readable metrics field.
+ // When pos is greater than metricsPosition, the actual position of the field in
+ // fileAsStruct should be subtracted by 1.
+ return struct.get(pos - 1, javaClass);
+ }
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("StructWithReadableMetrics is read only");
+ }
+ }
}