You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/27 18:41:33 UTC
[iceberg] branch master updated: Spark: Support spec ID and
partition metadata columns (#2984)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7cf96f0 Spark: Support spec ID and partition metadata columns (#2984)
7cf96f0 is described below
commit 7cf96f096edeceac13916676c273954fdd435513
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Sep 27 11:41:20 2021 -0700
Spark: Support spec ID and partition metadata columns (#2984)
---
.../org/apache/iceberg/util/StructProjection.java | 33 +++-
.../java/org/apache/iceberg/MetadataColumns.java | 35 +++-
.../org/apache/iceberg/util/PartitionUtil.java | 30 +++-
.../iceberg/spark/source/BaseDataReader.java | 47 ++++-
.../iceberg/spark/source/BatchDataReader.java | 5 +-
.../spark/source/EqualityDeleteRowReader.java | 3 +-
.../apache/iceberg/spark/source/RowDataReader.java | 5 +-
.../spark/source/TestSparkBaseDataReader.java | 23 +--
.../iceberg/spark/source/SparkScanBuilder.java | 2 +-
.../iceberg/spark/source/SparkTestTable.java | 60 +++++++
.../iceberg/spark/source/TestSparkCatalog.java | 12 +-
.../spark/source/TestSparkMetadataColumns.java | 194 +++++++++++++++++++++
12 files changed, 407 insertions(+), 42 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
index be05b0f..c18f69f 100644
--- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java
+++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
@@ -58,12 +58,30 @@ public class StructProjection implements StructLike {
return new StructProjection(dataSchema.asStruct(), projectedSchema.asStruct());
}
+ /**
+ * Creates a projecting wrapper for {@link StructLike} rows.
+ * <p>
+ * This projection allows missing fields and does not work with repeated types like lists and maps.
+ *
+ * @param structType type of rows wrapped by this projection
+ * @param projectedStructType result type of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static StructProjection createAllowMissing(StructType structType, StructType projectedStructType) {
+ return new StructProjection(structType, projectedStructType, true);
+ }
+
private final StructType type;
private final int[] positionMap;
private final StructProjection[] nestedProjections;
private StructLike struct;
private StructProjection(StructType structType, StructType projection) {
+ this(structType, projection, false);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private StructProjection(StructType structType, StructType projection, boolean allowMissing) {
this.type = projection;
this.positionMap = new int[projection.fields().size()];
this.nestedProjections = new StructProjection[projection.fields().size()];
@@ -116,7 +134,10 @@ public class StructProjection implements StructLike {
}
}
- if (!found) {
+ if (!found && projectedField.isOptional() && allowMissing) {
+ positionMap[pos] = -1;
+ nestedProjections[pos] = null;
+ } else if (!found) {
throw new IllegalArgumentException(String.format("Cannot find field %s in %s", projectedField, structType));
}
}
@@ -134,11 +155,17 @@ public class StructProjection implements StructLike {
@Override
public <T> T get(int pos, Class<T> javaClass) {
+ int structPos = positionMap[pos];
+
if (nestedProjections[pos] != null) {
- return javaClass.cast(nestedProjections[pos].wrap(struct.get(positionMap[pos], StructLike.class)));
+ return javaClass.cast(nestedProjections[pos].wrap(struct.get(structPos, StructLike.class)));
}
- return struct.get(positionMap[pos], javaClass);
+ if (structPos != -1) {
+ return struct.get(structPos, javaClass);
+ } else {
+ return null;
+ }
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
index e1cf096..af7b655 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
@@ -38,6 +38,12 @@ public class MetadataColumns {
Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
public static final NestedField IS_DELETED = NestedField.required(
Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the row has been deleted");
+ public static final NestedField SPEC_ID = NestedField.required(
+ Integer.MAX_VALUE - 4, "_spec_id", Types.IntegerType.get(), "Spec ID used to track the file containing a row");
+ // the partition column type is not static and depends on all specs in the table
+ public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5;
+ public static final String PARTITION_COLUMN_NAME = "_partition";
+ public static final String PARTITION_COLUMN_DOC = "Partition to which a row belongs to";
// IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
public static final NestedField DELETE_FILE_PATH = NestedField.required(
@@ -51,24 +57,39 @@ public class MetadataColumns {
private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of(
FILE_PATH.name(), FILE_PATH,
ROW_POSITION.name(), ROW_POSITION,
- IS_DELETED.name(), IS_DELETED);
+ IS_DELETED.name(), IS_DELETED,
+ SPEC_ID.name(), SPEC_ID
+ );
- private static final Set<Integer> META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId)
- .collect(ImmutableSet.toImmutableSet());
+ private static final Set<Integer> META_IDS = ImmutableSet.of(
+ FILE_PATH.fieldId(),
+ ROW_POSITION.fieldId(),
+ IS_DELETED.fieldId(),
+ SPEC_ID.fieldId(),
+ PARTITION_COLUMN_ID
+ );
public static Set<Integer> metadataFieldIds() {
return META_IDS;
}
- public static NestedField get(String name) {
- return META_COLUMNS.get(name);
+ public static NestedField metadataColumn(Table table, String name) {
+ if (name.equals(PARTITION_COLUMN_NAME)) {
+ return Types.NestedField.optional(
+ PARTITION_COLUMN_ID,
+ PARTITION_COLUMN_NAME,
+ Partitioning.partitionType(table),
+ PARTITION_COLUMN_DOC);
+ } else {
+ return META_COLUMNS.get(name);
+ }
}
public static boolean isMetadataColumn(String name) {
- return META_COLUMNS.containsKey(name);
+ return name.equals(PARTITION_COLUMN_NAME) || META_COLUMNS.containsKey(name);
}
public static boolean nonMetadataColumn(String name) {
- return !META_COLUMNS.containsKey(name);
+ return !isMetadataColumn(name);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index 929f77a..02c8b30 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -36,10 +36,15 @@ public class PartitionUtil {
}
public static Map<Integer, ?> constantsMap(FileScanTask task) {
- return constantsMap(task, (type, constant) -> constant);
+ return constantsMap(task, null, (type, constant) -> constant);
}
public static Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> convertConstant) {
+ return constantsMap(task, null, convertConstant);
+ }
+
+ public static Map<Integer, ?> constantsMap(FileScanTask task, Types.StructType partitionType,
+ BiFunction<Type, Object, Object> convertConstant) {
PartitionSpec spec = task.spec();
StructLike partitionData = task.file().partition();
@@ -51,6 +56,22 @@ public class PartitionUtil {
MetadataColumns.FILE_PATH.fieldId(),
convertConstant.apply(Types.StringType.get(), task.file().path()));
+ // add _spec_id
+ idToConstant.put(
+ MetadataColumns.SPEC_ID.fieldId(),
+ convertConstant.apply(Types.IntegerType.get(), task.file().specId()));
+
+ // add _partition
+ if (partitionType != null) {
+ if (partitionType.fields().size() > 0) {
+ StructLike coercedPartition = coercePartition(partitionType, spec, partitionData);
+ idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, convertConstant.apply(partitionType, coercedPartition));
+ } else {
+ // use null as some query engines may not be able to handle empty structs
+ idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, null);
+ }
+ }
+
List<Types.NestedField> partitionFields = spec.partitionType().fields();
List<PartitionField> fields = spec.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
@@ -63,4 +84,11 @@ public class PartitionUtil {
return idToConstant;
}
+
+ // adapts the provided partition data to match the table partition type
+ private static StructLike coercePartition(Types.StructType partitionType, PartitionSpec spec, StructLike partition) {
+ StructProjection projection = StructProjection.createAllowMissing(spec.partitionType(), partitionType);
+ projection.wrap(partition);
+ return projection;
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index c8b33dd..b58745c 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -24,24 +24,32 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
@@ -55,6 +63,7 @@ import org.slf4j.LoggerFactory;
abstract class BaseDataReader<T> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BaseDataReader.class);
+ private final Table table;
private final Iterator<FileScanTask> tasks;
private final Map<String, InputFile> inputFiles;
@@ -62,17 +71,18 @@ abstract class BaseDataReader<T> implements Closeable {
private T current = null;
private FileScanTask currentTask = null;
- BaseDataReader(CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
+ BaseDataReader(Table table, CombinedScanTask task) {
+ this.table = table;
this.tasks = task.files().iterator();
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
task.files().stream()
.flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
.forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
- .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));
+ .map(entry -> EncryptedFiles.encryptedInput(table.io().newInputFile(entry.getKey()), entry.getValue()));
// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
- Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(encrypted::iterator);
+ Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encrypted::iterator);
Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
@@ -132,6 +142,15 @@ abstract class BaseDataReader<T> implements Closeable {
return inputFiles.get(location);
}
+ protected Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema) {
+ if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
+ StructType partitionType = Partitioning.partitionType(table);
+ return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant);
+ } else {
+ return PartitionUtil.constantsMap(task, BaseDataReader::convertConstant);
+ }
+ }
+
protected static Object convertConstant(Type type, Object value) {
if (value == null) {
return null;
@@ -155,6 +174,24 @@ abstract class BaseDataReader<T> implements Closeable {
return ByteBuffers.toByteArray((ByteBuffer) value);
case BINARY:
return ByteBuffers.toByteArray((ByteBuffer) value);
+ case STRUCT:
+ StructType structType = (StructType) type;
+
+ if (structType.fields().isEmpty()) {
+ return new GenericInternalRow();
+ }
+
+ List<NestedField> fields = structType.fields();
+ Object[] values = new Object[fields.size()];
+ StructLike struct = (StructLike) value;
+
+ for (int index = 0; index < fields.size(); index++) {
+ NestedField field = fields.get(index);
+ Type fieldType = field.type();
+ values[index] = convertConstant(fieldType, struct.get(index, fieldType.typeId().javaClass()));
+ }
+
+ return new GenericInternalRow(values);
default:
}
return value;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 8cfe46b..e4bd3ce 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -41,7 +41,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -52,7 +51,7 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
private final int batchSize;
BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
- super(task, table.io(), table.encryption());
+ super(table, task);
this.expectedSchema = expectedSchema;
this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = caseSensitive;
@@ -66,7 +65,7 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
// update the current file for Spark's filename() function
InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
- Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, BatchDataReader::convertConstant);
+ Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
CloseableIterable<ColumnarBatch> iter;
InputFile location = getInputFile(task);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index d4328ad..ce2226f 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -26,7 +26,6 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -44,7 +43,7 @@ public class EqualityDeleteRowReader extends RowDataReader {
// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
- Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+ Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
DataFile file = task.file();
// update the current file for Spark's filename() function
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 391d4a0..8770e17 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -44,7 +44,6 @@ import org.apache.iceberg.spark.data.SparkAvroReader;
import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -56,7 +55,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
private final boolean caseSensitive;
RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
- super(task, table.io(), table.encryption());
+ super(table, task);
this.tableSchema = table.schema();
this.expectedSchema = expectedSchema;
this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
@@ -69,7 +68,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
// schema or rows returned by readers
Schema requiredSchema = deletes.requiredSchema();
- Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+ Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
DataFile file = task.file();
// update the current file for Spark's filename() function
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
index 51b47cb..8bae666 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
@@ -30,7 +30,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericData;
-import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.DataFile;
@@ -39,8 +38,6 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
@@ -59,7 +56,7 @@ public abstract class TestSparkBaseDataReader {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
- private static final Configuration CONFD = new Configuration();
+ private Table table;
// Simulates the closeable iterator of data to be read
private static class CloseableIntegerRange implements CloseableIterator<Integer> {
@@ -92,10 +89,8 @@ public abstract class TestSparkBaseDataReader {
private static class ClosureTrackingReader extends BaseDataReader<Integer> {
private Map<String, CloseableIntegerRange> tracker = new HashMap<>();
- ClosureTrackingReader(List<FileScanTask> tasks) {
- super(new BaseCombinedScanTask(tasks),
- new HadoopFileIO(CONFD),
- new PlaintextEncryptionManager());
+ ClosureTrackingReader(Table table, List<FileScanTask> tasks) {
+ super(table, new BaseCombinedScanTask(tasks));
}
@Override
@@ -124,7 +119,7 @@ public abstract class TestSparkBaseDataReader {
Integer recordPerTask = 10;
List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
int countRecords = 0;
while (reader.next()) {
@@ -151,7 +146,7 @@ public abstract class TestSparkBaseDataReader {
FileScanTask firstTask = tasks.get(0);
FileScanTask secondTask = tasks.get(1);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
// Total of 2 elements
Assert.assertTrue(reader.next());
@@ -175,7 +170,7 @@ public abstract class TestSparkBaseDataReader {
Integer recordPerTask = 10;
List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
reader.close();
@@ -191,7 +186,7 @@ public abstract class TestSparkBaseDataReader {
Integer recordPerTask = 10;
List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
Integer halfDataSize = (totalTasks * recordPerTask) / 2;
for (int i = 0; i < halfDataSize; i++) {
@@ -217,7 +212,7 @@ public abstract class TestSparkBaseDataReader {
Integer recordPerTask = 10;
List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
// Total 100 elements, only 5 iterators have been created
for (int i = 0; i < 45; i++) {
@@ -250,7 +245,7 @@ public abstract class TestSparkBaseDataReader {
);
try {
- Table table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned());
+ this.table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned());
// Important: use the table's schema for the rest of the test
// When tables are created, the column ids are reassigned.
Schema tableSchema = table.schema();
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 633a171..483dbcf 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -149,7 +149,7 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
// metadata columns
List<Types.NestedField> fields = metaColumns.stream()
.distinct()
- .map(MetadataColumns::get)
+ .map(name -> MetadataColumns.metadataColumn(table, name))
.collect(Collectors.toList());
Schema meta = new Schema(fields);
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java
new file mode 100644
index 0000000..afb1136
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+// TODO: remove this class once we compile against Spark 3.2
+public class SparkTestTable extends SparkTable {
+
+ private final String[] metadataColumnNames;
+
+ public SparkTestTable(Table icebergTable, String[] metadataColumnNames, boolean refreshEagerly) {
+ super(icebergTable, refreshEagerly);
+ this.metadataColumnNames = metadataColumnNames;
+ }
+
+ @Override
+ public StructType schema() {
+ StructType schema = super.schema();
+ if (metadataColumnNames != null) {
+ for (String columnName : metadataColumnNames) {
+ Types.NestedField metadataColumn = MetadataColumns.metadataColumn(table(), columnName);
+ schema = schema.add(columnName, SparkSchemaUtil.convert(metadataColumn.type()));
+ }
+ }
+ return schema;
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ SparkScanBuilder scanBuilder = (SparkScanBuilder) super.newScanBuilder(options);
+ if (metadataColumnNames != null) {
+ scanBuilder.withMetadataColumns(metadataColumnNames);
+ }
+ return scanBuilder;
+ }
+}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 92013d3..027c88c 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.spark.source;
-import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -31,7 +30,14 @@ public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> exten
@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
- TestTables.TestTable table = TestTables.load(Spark3Util.identifierToTableIdentifier(ident).toString());
- return new SparkTable(table, false);
+ String[] parts = ident.name().split("\\$", 2);
+ if (parts.length == 2) {
+ TestTables.TestTable table = TestTables.load(parts[0]);
+ String[] metadataColumns = parts[1].split(",");
+ return new SparkTestTable(table, metadataColumns, false);
+ } else {
+ TestTables.TestTable table = TestTables.load(ident.name());
+ return new SparkTestTable(table, null, false);
+ }
}
}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
new file mode 100644
index 0000000..b29d281
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.ORC_VECTORIZATION_ENABLED;
+import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
+
+@RunWith(Parameterized.class)
+public class TestSparkMetadataColumns extends SparkTestBase {
+
+ private static final String TABLE_NAME = "test_table";
+ private static final Schema SCHEMA = new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "category", Types.StringType.get()),
+ Types.NestedField.optional(3, "data", Types.StringType.get())
+ );
+ private static final PartitionSpec UNKNOWN_SPEC = PartitionSpecParser.fromJson(SCHEMA,
+ "{ \"spec-id\": 1, \"fields\": [ { \"name\": \"id_zero\", \"transform\": \"zero\", \"source-id\": 1 } ] }");
+
+ @Parameterized.Parameters(name = "fileFormat = {0}, vectorized = {1}, formatVersion = {2}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ { FileFormat.PARQUET, false, 1},
+ { FileFormat.PARQUET, true, 1},
+ { FileFormat.PARQUET, false, 2},
+ { FileFormat.PARQUET, true, 2},
+ { FileFormat.AVRO, false, 1},
+ { FileFormat.AVRO, false, 2},
+ { FileFormat.ORC, false, 1},
+ { FileFormat.ORC, true, 1},
+ { FileFormat.ORC, false, 2},
+ { FileFormat.ORC, true, 2},
+ };
+ }
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private final FileFormat fileFormat;
+ private final boolean vectorized;
+ private final int formatVersion;
+
+ private Table table = null;
+
+ public TestSparkMetadataColumns(FileFormat fileFormat, boolean vectorized, int formatVersion) {
+ this.fileFormat = fileFormat;
+ this.vectorized = vectorized;
+ this.formatVersion = formatVersion;
+ }
+
+ @BeforeClass
+ public static void setupSpark() {
+ ImmutableMap<String, String> config = ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default",
+ "cache-enabled", "true"
+ );
+ spark.conf().set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.source.TestSparkCatalog");
+ config.forEach((key, value) -> spark.conf().set("spark.sql.catalog.spark_catalog." + key, value));
+ }
+
+ @Before
+ public void setupTable() throws IOException {
+ createAndInitTable();
+ }
+
+ @After
+ public void dropTable() {
+ TestTables.clearTables();
+ }
+
+ // TODO: remove testing workarounds once we compile against Spark 3.2
+
+ @Test
+ public void testSpecAndPartitionMetadataColumns() {
+ // TODO: support metadata structs in vectorized ORC reads
+ Assume.assumeFalse(fileFormat == FileFormat.ORC && vectorized);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ table.refresh();
+ table.updateSpec()
+ .addField("data")
+ .commit();
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ table.refresh();
+ table.updateSpec()
+ .addField(Expressions.bucket("category", 8))
+ .commit();
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ table.refresh();
+ table.updateSpec()
+ .removeField("data")
+ .commit();
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ table.refresh();
+ table.updateSpec()
+ .renameField("category_bucket_8", "category_bucket_8_another_name")
+ .commit();
+
+ List<Object[]> expected = ImmutableList.of(
+ row(0, row(null, null)),
+ row(1, row("b1", null)),
+ row(2, row("b1", 2)),
+ row(3, row(null, 2))
+ );
+ assertEquals("Rows must match", expected,
+ sql("SELECT _spec_id, _partition FROM `%s$_spec_id,_partition` ORDER BY _spec_id", TABLE_NAME));
+ }
+
+ @Test
+ public void testPartitionMetadataColumnWithUnknownTransforms() {
+ // replace the table spec to include an unknown transform
+ TableOperations ops = ((HasTableOperations) table).operations();
+ TableMetadata base = ops.current();
+ ops.commit(base, base.updatePartitionSpec(UNKNOWN_SPEC));
+
+ AssertHelpers.assertThrows("Should fail to query the partition metadata column",
+ ValidationException.class, "Cannot build table partition type, unknown transforms",
+ () -> sql("SELECT _partition FROM `%s$_partition`", TABLE_NAME));
+ }
+
+ private void createAndInitTable() throws IOException {
+ this.table = TestTables.create(temp.newFolder(), TABLE_NAME, SCHEMA, PartitionSpec.unpartitioned());
+
+ UpdateProperties updateProperties = table.updateProperties();
+ updateProperties.set(FORMAT_VERSION, String.valueOf(formatVersion));
+ updateProperties.set(DEFAULT_FILE_FORMAT, fileFormat.name());
+
+ switch (fileFormat) {
+ case PARQUET:
+ updateProperties.set(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized));
+ break;
+ case ORC:
+ updateProperties.set(ORC_VECTORIZATION_ENABLED, String.valueOf(vectorized));
+ break;
+ default:
+ Preconditions.checkState(!vectorized, "File format %s does not support vectorized reads", fileFormat);
+ }
+
+ updateProperties.commit();
+ }
+}