You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/28 02:07:52 UTC

[iceberg] branch master updated: Spark: Add __metadata_col for metadata columns when converting types (#5075)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d6bbc4cc Spark: Add __metadata_col for metadata columns when converting types (#5075)
7d6bbc4cc is described below

commit 7d6bbc4cc711fea1e7fc2b00a63bbd23895e6b8b
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Tue Jun 28 07:37:47 2022 +0530

    Spark: Add __metadata_col for metadata columns when converting types (#5075)
    
    Co-authored-by: Prashant Singh <ps...@amazon.com>
---
 .../org/apache/iceberg/spark/TypeToSparkType.java  | 18 ++++++++++--
 .../apache/iceberg/spark/TestSparkSchemaUtil.java  | 33 ++++++++++++++++++++--
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
index 6a8be60eb..b12d3e503 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.spark;
 
 import java.util.List;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Type;
@@ -37,6 +38,7 @@ import org.apache.spark.sql.types.IntegerType$;
 import org.apache.spark.sql.types.LongType$;
 import org.apache.spark.sql.types.MapType$;
 import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
 import org.apache.spark.sql.types.StringType$;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType$;
@@ -46,6 +48,8 @@ class TypeToSparkType extends TypeUtil.SchemaVisitor<DataType> {
   TypeToSparkType() {
   }
 
+  public static final String METADATA_COL_ATTR_KEY = "__metadata_col";
+
   @Override
   public DataType schema(Schema schema, DataType structType) {
     return structType;
@@ -59,8 +63,8 @@ class TypeToSparkType extends TypeUtil.SchemaVisitor<DataType> {
     for (int i = 0; i < fields.size(); i += 1) {
       Types.NestedField field = fields.get(i);
       DataType type = fieldResults.get(i);
-      StructField sparkField = StructField.apply(
-          field.name(), type, field.isOptional(), Metadata.empty());
+      Metadata metadata = fieldMetadata(field.fieldId());
+      StructField sparkField = StructField.apply(field.name(), type, field.isOptional(), metadata);
       if (field.doc() != null) {
         sparkField = sparkField.withComment(field.doc());
       }
@@ -122,4 +126,14 @@ class TypeToSparkType extends TypeUtil.SchemaVisitor<DataType> {
             "Cannot convert unknown type to Spark: " + primitive);
     }
   }
+
+  private Metadata fieldMetadata(int fieldId) {
+    if (MetadataColumns.metadataFieldIds().contains(fieldId)) {
+      return new MetadataBuilder()
+          .putBoolean(METADATA_COL_ATTR_KEY, true)
+          .build();
+    }
+
+    return Metadata.empty();
+  }
 }
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
index 8bb32c969..40c77cbec 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
@@ -20,8 +20,12 @@
 package org.apache.iceberg.spark;
 
 import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.types.StructType;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -33,8 +37,15 @@ public class TestSparkSchemaUtil {
       optional(2, "data", Types.StringType.get())
   );
 
+  private static final Schema TEST_SCHEMA_WITH_METADATA_COLS = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get()),
+      MetadataColumns.FILE_PATH,
+      MetadataColumns.ROW_POSITION
+  );
+
   @Test
-  public void testEstiamteSizeMaxValue() throws IOException {
+  public void testEstimateSizeMaxValue() throws IOException {
     Assert.assertEquals("estimateSize returns Long max value", Long.MAX_VALUE,
         SparkSchemaUtil.estimateSize(
             null,
@@ -42,14 +53,30 @@ public class TestSparkSchemaUtil {
   }
 
   @Test
-  public void testEstiamteSizeWithOverflow() throws IOException {
+  public void testEstimateSizeWithOverflow() throws IOException {
     long tableSize = SparkSchemaUtil.estimateSize(SparkSchemaUtil.convert(TEST_SCHEMA), Long.MAX_VALUE - 1);
     Assert.assertEquals("estimateSize handles overflow", Long.MAX_VALUE, tableSize);
   }
 
   @Test
-  public void testEstiamteSize() throws IOException {
+  public void testEstimateSize() throws IOException {
     long tableSize = SparkSchemaUtil.estimateSize(SparkSchemaUtil.convert(TEST_SCHEMA), 1);
     Assert.assertEquals("estimateSize matches with expected approximation", 24, tableSize);
   }
+
+  @Test
+  public void testSchemaConversionWithMetaDataColumnSchema() {
+    StructType structType = SparkSchemaUtil.convert(TEST_SCHEMA_WITH_METADATA_COLS);
+    List<AttributeReference> attrRefs = scala.collection.JavaConverters.seqAsJavaList(structType.toAttributes());
+    for (AttributeReference attrRef : attrRefs) {
+      if (MetadataColumns.isMetadataColumn(attrRef.name())) {
+        Assert.assertTrue("metadata columns should have __metadata_col in attribute metadata",
+            attrRef.metadata().contains(TypeToSparkType.METADATA_COL_ATTR_KEY) &&
+                attrRef.metadata().getBoolean(TypeToSparkType.METADATA_COL_ATTR_KEY));
+      } else {
+        Assert.assertFalse("non metadata columns should not have __metadata_col in attribute metadata",
+            attrRef.metadata().contains(TypeToSparkType.METADATA_COL_ATTR_KEY));
+      }
+    }
+  }
 }