You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/28 02:07:52 UTC
[iceberg] branch master updated: Spark: Add __metadata_col for metadata columns when converting types (#5075)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7d6bbc4cc Spark: Add __metadata_col for metadata columns when converting types (#5075)
7d6bbc4cc is described below
commit 7d6bbc4cc711fea1e7fc2b00a63bbd23895e6b8b
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Tue Jun 28 07:37:47 2022 +0530
Spark: Add __metadata_col for metadata columns when converting types (#5075)
Co-authored-by: Prashant Singh <ps...@amazon.com>
---
.../org/apache/iceberg/spark/TypeToSparkType.java | 18 ++++++++++--
.../apache/iceberg/spark/TestSparkSchemaUtil.java | 33 ++++++++++++++++++++--
2 files changed, 46 insertions(+), 5 deletions(-)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
index 6a8be60eb..b12d3e503 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.spark;
import java.util.List;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
@@ -37,6 +38,7 @@ import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.MapType$;
import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType$;
@@ -46,6 +48,8 @@ class TypeToSparkType extends TypeUtil.SchemaVisitor<DataType> {
TypeToSparkType() {
}
+ public static final String METADATA_COL_ATTR_KEY = "__metadata_col";
+
@Override
public DataType schema(Schema schema, DataType structType) {
return structType;
@@ -59,8 +63,8 @@ class TypeToSparkType extends TypeUtil.SchemaVisitor<DataType> {
for (int i = 0; i < fields.size(); i += 1) {
Types.NestedField field = fields.get(i);
DataType type = fieldResults.get(i);
- StructField sparkField = StructField.apply(
- field.name(), type, field.isOptional(), Metadata.empty());
+ Metadata metadata = fieldMetadata(field.fieldId());
+ StructField sparkField = StructField.apply(field.name(), type, field.isOptional(), metadata);
if (field.doc() != null) {
sparkField = sparkField.withComment(field.doc());
}
@@ -122,4 +126,14 @@ class TypeToSparkType extends TypeUtil.SchemaVisitor<DataType> {
"Cannot convert unknown type to Spark: " + primitive);
}
}
+
+ private Metadata fieldMetadata(int fieldId) {
+ if (MetadataColumns.metadataFieldIds().contains(fieldId)) {
+ return new MetadataBuilder()
+ .putBoolean(METADATA_COL_ATTR_KEY, true)
+ .build();
+ }
+
+ return Metadata.empty();
+ }
}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
index 8bb32c969..40c77cbec 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
@@ -20,8 +20,12 @@
package org.apache.iceberg.spark;
import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.types.StructType;
import org.junit.Assert;
import org.junit.Test;
@@ -33,8 +37,15 @@ public class TestSparkSchemaUtil {
optional(2, "data", Types.StringType.get())
);
+ private static final Schema TEST_SCHEMA_WITH_METADATA_COLS = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get()),
+ MetadataColumns.FILE_PATH,
+ MetadataColumns.ROW_POSITION
+ );
+
@Test
- public void testEstiamteSizeMaxValue() throws IOException {
+ public void testEstimateSizeMaxValue() throws IOException {
Assert.assertEquals("estimateSize returns Long max value", Long.MAX_VALUE,
SparkSchemaUtil.estimateSize(
null,
@@ -42,14 +53,30 @@ public class TestSparkSchemaUtil {
}
@Test
- public void testEstiamteSizeWithOverflow() throws IOException {
+ public void testEstimateSizeWithOverflow() throws IOException {
long tableSize = SparkSchemaUtil.estimateSize(SparkSchemaUtil.convert(TEST_SCHEMA), Long.MAX_VALUE - 1);
Assert.assertEquals("estimateSize handles overflow", Long.MAX_VALUE, tableSize);
}
@Test
- public void testEstiamteSize() throws IOException {
+ public void testEstimateSize() throws IOException {
long tableSize = SparkSchemaUtil.estimateSize(SparkSchemaUtil.convert(TEST_SCHEMA), 1);
Assert.assertEquals("estimateSize matches with expected approximation", 24, tableSize);
}
+
+ @Test
+ public void testSchemaConversionWithMetaDataColumnSchema() {
+ StructType structType = SparkSchemaUtil.convert(TEST_SCHEMA_WITH_METADATA_COLS);
+ List<AttributeReference> attrRefs = scala.collection.JavaConverters.seqAsJavaList(structType.toAttributes());
+ for (AttributeReference attrRef : attrRefs) {
+ if (MetadataColumns.isMetadataColumn(attrRef.name())) {
+ Assert.assertTrue("metadata columns should have __metadata_col in attribute metadata",
+ attrRef.metadata().contains(TypeToSparkType.METADATA_COL_ATTR_KEY) &&
+ attrRef.metadata().getBoolean(TypeToSparkType.METADATA_COL_ATTR_KEY));
+ } else {
+ Assert.assertFalse("non metadata columns should not have __metadata_col in attribute metadata",
+ attrRef.metadata().contains(TypeToSparkType.METADATA_COL_ATTR_KEY));
+ }
+ }
+ }
}