You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/11/11 17:47:50 UTC

[GitHub] [drill] vvysotskyi commented on a change in pull request #2357: DRILL-8027: Format plugin for Apache Iceberg

vvysotskyi commented on a change in pull request #2357:
URL: https://github.com/apache/drill/pull/2357#discussion_r747693599



##########
File path: contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/format/IcebergFormatPlugin.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.iceberg.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.iceberg.IcebergGroupScan;
+import org.apache.drill.exec.store.iceberg.plan.IcebergPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class IcebergFormatPlugin implements FormatPlugin {
+
+  private static final String ICEBERG_CONVENTION_PREFIX = "ICEBERG.";
+
+  private final FileSystemConfig storageConfig;
+
+  private final IcebergFormatPluginConfig config;
+
+  private final Configuration fsConf;
+
+  private final DrillbitContext context;
+
+  private final String name;
+
+  private final IcebergFormatMatcher matcher;
+
+  private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+  public IcebergFormatPlugin(
+    String name,
+    DrillbitContext context,
+    Configuration fsConf,
+    FileSystemConfig storageConfig,
+    IcebergFormatPluginConfig config) {
+    this.storageConfig = storageConfig;
+    this.config = config;
+    this.fsConf = fsConf;
+    this.context = context;
+    this.name = name;
+    this.matcher = new IcebergFormatMatcher(this);
+    this.storagePluginRulesSupplier = storagePluginRulesSupplier(name);
+  }
+
+  private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) {
+    Convention convention = new Convention.Impl(ICEBERG_CONVENTION_PREFIX + name, PluginRel.class);
+    return StoragePluginRulesSupplier.builder()
+      .rulesProvider(new PluginRulesProviderImpl(convention, IcebergPluginImplementor::new))
+      .supportsFilterPushdown(true)
+      .supportsProjectPushdown(true)
+      .convention(convention)
+      .build();
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {

Review comment:
       Yep, in the future, we definitely should add it.

##########
File path: contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/IcebergGroupScan.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.iceberg;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import lombok.Builder;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.iceberg.format.IcebergFormatPlugin;
+import org.apache.drill.exec.store.iceberg.plan.DrillExprToIcebergTranslator;
+import org.apache.drill.exec.store.iceberg.snapshot.Snapshot;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopTables;
+
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+@Slf4j
+@JsonTypeName("iceberg-scan")
+@SuppressWarnings("unused")
+public class IcebergGroupScan extends AbstractGroupScan {
+
+  private final IcebergFormatPlugin formatPlugin;
+
+  private final String path;
+
+  private final TupleMetadata schema;
+
+  private final LogicalExpression condition;
+
+  private final DrillFileSystem fs;
+
+  private final List<SchemaPath> columns;
+
+  private int maxRecords;
+
+  private List<IcebergCompleteWork> chunks;
+
+  private TableScan tableScan;
+
+  private List<EndpointAffinity> endpointAffinities;
+
+  private ListMultimap<Integer, IcebergCompleteWork> mappings;
+
+  @JsonCreator
+  public IcebergGroupScan(
+      @JsonProperty("userName") String userName,
+      @JsonProperty("storage") StoragePluginConfig storageConfig,
+      @JsonProperty("format") FormatPluginConfig formatConfig,
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JsonProperty("schema") TupleMetadata schema,
+      @JsonProperty("path") String path,
+      @JsonProperty("condition") LogicalExpression condition,
+      @JsonProperty("maxRecords") Integer maxRecords,
+      @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this(userName,
+        pluginRegistry.resolveFormat(storageConfig, formatConfig, IcebergFormatPlugin.class),
+        schema, path, condition, columns, maxRecords);
+  }
+
+  @Builder(toBuilder = true)
+  private IcebergGroupScan(String userName, IcebergFormatPlugin formatPlugin,
+    TupleMetadata schema, String path, LogicalExpression condition, List<SchemaPath> columns, int maxRecords) {
+    super(userName);
+    this.formatPlugin = formatPlugin;
+    this.columns = columns;
+    this.path = path;
+    this.schema = schema;
+    this.condition = condition;
+    this.maxRecords = maxRecords;
+    this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
+    this.tableScan = initTableScan(formatPlugin, path, condition);
+
+    init();
+  }
+
+  public static TableScan initTableScan(IcebergFormatPlugin formatPlugin, String path, LogicalExpression condition) {
+    TableScan tableScan = new HadoopTables(formatPlugin.getFsConf()).load(path).newScan();
+    Map<String, String> properties = formatPlugin.getConfig().getProperties();
+    if (properties != null) {
+      for (Map.Entry<String, String> entry : properties.entrySet()) {
+        tableScan = tableScan.option(entry.getKey(), entry.getValue());
+      }
+    }
+    if (condition != null) {
+      Expression expression = condition.accept(
+        DrillExprToIcebergTranslator.INSTANCE, null);
+      tableScan = tableScan.filter(expression);
+    }
+    Snapshot snapshot = formatPlugin.getConfig().getSnapshot();
+    if (snapshot != null) {
+      tableScan = snapshot.apply(tableScan);
+    }
+    Boolean caseSensitive = formatPlugin.getConfig().getCaseSensitive();
+    if (caseSensitive != null) {
+      tableScan = tableScan.caseSensitive(caseSensitive);
+    }
+    Boolean includeColumnStats = formatPlugin.getConfig().getIncludeColumnStats();
+    if (includeColumnStats != null && includeColumnStats) {
+      tableScan = tableScan.includeColumnStats();
+    }
+    Boolean ignoreResiduals = formatPlugin.getConfig().getIgnoreResiduals();
+    if (ignoreResiduals != null && ignoreResiduals) {
+      tableScan = tableScan.ignoreResiduals();
+    }
+    return tableScan;
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   *
+   * @param that The IcebergGroupScan to clone
+   */
+  private IcebergGroupScan(IcebergGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.formatPlugin = that.formatPlugin;
+    this.path = that.path;
+    this.condition = that.condition;
+    this.schema = that.schema;
+    this.mappings = that.mappings;
+    this.fs = that.fs;
+    this.maxRecords = that.maxRecords;
+    this.chunks = that.chunks;
+    this.tableScan = that.tableScan;
+    this.endpointAffinities = that.endpointAffinities;
+  }
+
+  @Override
+  public IcebergGroupScan clone(List<SchemaPath> columns) {
+    return toBuilder().columns(columns).build();
+  }
+
+  @Override
+  public IcebergGroupScan applyLimit(int maxRecords) {
+    IcebergGroupScan clone = new IcebergGroupScan(this);
+    clone.maxRecords = maxRecords;
+    return clone;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    mappings = AssignmentCreator.getMappings(endpoints, chunks);
+  }
+
+  private void createMappings(List<EndpointAffinity> affinities) {
+    List<DrillbitEndpoint> endpoints = affinities.stream()
+      .map(EndpointAffinity::getEndpoint)
+      .collect(Collectors.toList());
+    applyAssignments(endpoints);
+  }
+
+  @Override
+  public IcebergSubScan getSpecificScan(int minorFragmentId) {
+    if (mappings == null) {
+      createMappings(endpointAffinities);
+    }
+    assert minorFragmentId < mappings.size() : String.format(

Review comment:
       It can be caused only by some issue in the code, not by something the user can affect.

##########
File path: contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
##########
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.iceberg;
+
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.iceberg.format.IcebergFormatPluginConfig;
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.Text;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.types.Types;
+import org.hamcrest.MatcherAssert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.apache.drill.test.TestBuilder.listOf;
+import static org.apache.drill.test.TestBuilder.mapOf;
+import static org.apache.drill.test.TestBuilder.mapOfObject;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IcebergQueriesTest extends ClusterTest {
+  private static Table table;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+    FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+    Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats());
+    formats.put("iceberg", IcebergFormatPluginConfig.builder().build());
+    FileSystemConfig newPluginConfig = new FileSystemConfig(
+      pluginConfig.getConnection(),
+      pluginConfig.getConfig(),
+      pluginConfig.getWorkspaces(),
+      formats,
+      PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
+    pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+    Configuration config = new Configuration();
+    config.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+
+    HadoopTables tables = new HadoopTables(config);
+    Schema structSchema = new Schema(
+      Types.NestedField.optional(13, "struct_int_field", Types.IntegerType.get()),
+      Types.NestedField.optional(14, "struct_string_field", Types.StringType.get())
+    );
+    Types.ListType repeatedStructType =  Types.ListType.ofOptional(
+      16, Types.StructType.of(
+      Types.NestedField.optional(17, "struct_int_field", Types.IntegerType.get()),
+      Types.NestedField.optional(18, "struct_string_field", Types.StringType.get())
+    ));
+    Schema schema = new Schema(
+      Types.NestedField.optional(1, "int_field", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "long_field", Types.LongType.get()),
+      Types.NestedField.optional(3, "float_field", Types.FloatType.get()),
+      Types.NestedField.optional(4, "double_field", Types.DoubleType.get()),
+      Types.NestedField.optional(5, "string_field", Types.StringType.get()),
+      Types.NestedField.optional(6, "boolean_field", Types.BooleanType.get()),
+      Types.NestedField.optional(26, "time_field", Types.TimeType.get()),
+      Types.NestedField.optional(27, "timestamp_field", Types.TimestampType.withoutZone()),
+      Types.NestedField.optional(28, "date_field", Types.DateType.get()),
+      Types.NestedField.optional(29, "decimal_field", Types.DecimalType.of(4, 2)),
+      Types.NestedField.optional(30, "uuid_field", Types.UUIDType.get()),
+      Types.NestedField.optional(31, "fixed_field", Types.FixedType.ofLength(10)),
+      Types.NestedField.optional(32, "binary_field", Types.BinaryType.get()),
+      Types.NestedField.optional(7, "list_field", Types.ListType.ofOptional(
+        10, Types.StringType.get())),
+      Types.NestedField.optional(8, "map_field", Types.MapType.ofOptional(
+        11, 12, Types.StringType.get(), Types.FloatType.get())),
+      Types.NestedField.required(9, "struct_field", structSchema.asStruct()),
+      Types.NestedField.required(15, "repeated_struct_field", repeatedStructType),
+      Types.NestedField.required(19, "repeated_list_field", Types.ListType.ofOptional(
+        20, Types.ListType.ofOptional(21, Types.StringType.get()))),
+      Types.NestedField.optional(22, "repeated_map_field", Types.ListType.ofOptional(
+        23, Types.MapType.ofOptional(24, 25, Types.StringType.get(), Types.FloatType.get())))
+    );
+
+    List<String> listValue = Arrays.asList("a", "b", "c");
+
+    Map<String, Float> mapValue = new HashMap<>();
+    mapValue.put("a", 0.1F);
+    mapValue.put("b", 0.2F);
+
+    Map<String, Float> secondMapValue = new HashMap<>();
+    secondMapValue.put("true", 1F);
+    secondMapValue.put("false", 0F);
+
+    Record structValue = GenericRecord.create(structSchema);
+    structValue.setField("struct_int_field", 123);
+    structValue.setField("struct_string_field", "abc");
+
+    Record secondStructValue = GenericRecord.create(structSchema);
+    secondStructValue.setField("struct_int_field", 321);
+    secondStructValue.setField("struct_string_field", "def");
+
+    Record record = GenericRecord.create(schema);
+    record.setField("int_field", 1);
+    record.setField("long_field", 100L);
+    record.setField("float_field", 0.5F);
+    record.setField("double_field", 1.5D);
+    record.setField("string_field", "abc");
+    record.setField("boolean_field", true);
+    record.setField("time_field", LocalTime.of(2, 42, 42));
+    record.setField("timestamp_field", LocalDateTime.of(1994, 4, 18, 11, 0, 0));
+    record.setField("date_field", LocalDate.of(1994, 4, 18));
+    record.setField("decimal_field", new BigDecimal("12.34"));
+    record.setField("uuid_field", new byte[16]);
+    record.setField("fixed_field", new byte[10]);
+    record.setField("binary_field", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)));
+    record.setField("list_field", listValue);
+    record.setField("map_field", mapValue);
+    record.setField("struct_field", structValue);
+    record.setField("repeated_struct_field", Arrays.asList(structValue, structValue));
+    record.setField("repeated_list_field", Arrays.asList(listValue, listValue));
+    record.setField("repeated_map_field", Arrays.asList(mapValue, mapValue));
+
+    Record nullsRecord = GenericRecord.create(schema);
+    nullsRecord.setField("int_field", null);
+    nullsRecord.setField("long_field", null);
+    nullsRecord.setField("float_field", null);
+    nullsRecord.setField("double_field", null);
+    nullsRecord.setField("string_field", null);
+    nullsRecord.setField("boolean_field", null);
+    nullsRecord.setField("time_field", null);
+    nullsRecord.setField("timestamp_field", null);
+    nullsRecord.setField("date_field", null);
+    nullsRecord.setField("decimal_field", null);
+    nullsRecord.setField("uuid_field", null);
+    nullsRecord.setField("fixed_field", null);
+    nullsRecord.setField("binary_field", null);
+    nullsRecord.setField("list_field", null);
+    nullsRecord.setField("map_field", null);
+    nullsRecord.setField("struct_field", GenericRecord.create(structSchema));
+    nullsRecord.setField("repeated_struct_field", Collections.emptyList());
+    nullsRecord.setField("repeated_list_field", Collections.emptyList());
+    nullsRecord.setField("repeated_map_field", Collections.emptyList());
+
+    Record secondRecord = GenericRecord.create(schema);
+    secondRecord.setField("int_field", 988);
+    secondRecord.setField("long_field", 543L);
+    secondRecord.setField("float_field", Float.NaN);
+    secondRecord.setField("double_field", Double.MAX_VALUE);
+    secondRecord.setField("string_field", "def");
+    secondRecord.setField("boolean_field", false);
+    secondRecord.setField("time_field", LocalTime.of(3, 41, 53));
+    secondRecord.setField("timestamp_field", LocalDateTime.of(1995, 9, 10, 9, 0, 0));
+    secondRecord.setField("date_field", LocalDate.of(1995, 9, 10));
+    secondRecord.setField("decimal_field", new BigDecimal("99.99"));
+    secondRecord.setField("uuid_field", new byte[16]);
+    secondRecord.setField("fixed_field", new byte[10]);
+    secondRecord.setField("binary_field", ByteBuffer.wrap("world".getBytes(StandardCharsets.UTF_8)));
+    secondRecord.setField("list_field", Arrays.asList("y", "n"));
+    secondRecord.setField("map_field", secondMapValue);
+    secondRecord.setField("struct_field", secondStructValue);
+    secondRecord.setField("repeated_struct_field", Arrays.asList(structValue, secondStructValue));
+    secondRecord.setField("repeated_list_field", Arrays.asList(listValue, Arrays.asList("y", "n")));
+    secondRecord.setField("repeated_map_field", Arrays.asList(mapValue, secondMapValue));
+
+    String location = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypes").toUri().getPath();
+    table = tables.create(schema, location);
+
+    writeParquetAndCommitDataFile(table, "allTypes", Arrays.asList(record, nullsRecord));
+    writeParquetAndCommitDataFile(table, "allTypes_1", Collections.singleton(secondRecord));
+
+    String avroLocation = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypesAvro").toUri().getPath();
+    writeAndCommitDataFile(tables.create(structSchema, avroLocation), "allTypes", FileFormat.AVRO,
+      Arrays.asList(structValue, GenericRecord.create(structSchema), secondStructValue));
+
+    String orcLocation = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypesOrc").toUri().getPath();
+    writeAndCommitDataFile(tables.create(structSchema, orcLocation), "allTypes", FileFormat.ORC,
+      Arrays.asList(structValue, GenericRecord.create(structSchema), secondStructValue));
+
+    String emptyTableLocation = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypesEmpty").toUri().getPath();
+    tables.create(structSchema, emptyTableLocation);
+  }
+
+  private static void writeParquetAndCommitDataFile(Table table, String name, Iterable<Record> records) throws IOException {
+    writeAndCommitDataFile(table, name, FileFormat.PARQUET, records);
+  }
+
+  private static void writeAndCommitDataFile(Table table, String name, FileFormat fileFormat, Iterable<Record> records) throws IOException {
+    OutputFile outputFile = table.io().newOutputFile(
+      new Path(table.location(), fileFormat.addExtension(name)).toUri().getPath());
+
+    FileAppender<Record> fileAppender = new GenericAppenderFactory(table.schema())
+      .newAppender(outputFile, fileFormat);
+    fileAppender.addAll(records);
+    fileAppender.close();
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+      .withInputFile(outputFile.toInputFile())
+      .withMetrics(fileAppender.metrics())
+      .build();
+
+    Transaction transaction = table.newTransaction();
+    transaction.newAppend()
+      .appendFile(dataFile)
+      .commit();
+    transaction.commitTransaction();
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String plan = queryBuilder().sql("select * from dfs.tmp.testAllTypes").explainJson();
+    queryBuilder().physical(plan).run();
+  }
+
+  @Test
+  public void testSelectWithSnapshotId() throws Exception {
+    String snapshotQuery = "select snapshot_id from dfs.tmp.`testAllTypes#snapshots` order by committed_at limit 1";
+
+    String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', snapshotId => %s))";
+
+    long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
+    String plan = queryBuilder().sql(query, snapshotId).explainJson();
+    long count = queryBuilder().physical(plan).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectWithSnapshotAsOfTime() throws Exception {
+    String snapshotQuery = "select committed_at from dfs.tmp.`testAllTypes#snapshots` order by committed_at limit 1";
+
+    String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', snapshotAsOfTime => %s))";
+
+    long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
+    String plan = queryBuilder().sql(query, snapshotId).explainJson();
+    long count = queryBuilder().physical(plan).run().recordCount();
+    assertEquals(2, count);
+  }
+
+  @Test
+  public void testSelectFromSnapshotId() throws Exception {
+    String snapshotQuery = "select snapshot_id from dfs.tmp.`testAllTypes#snapshots` order by committed_at limit 1";
+
+    String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', fromSnapshotId => %s))";
+
+    long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
+    String plan = queryBuilder().sql(query, snapshotId).explainJson();
+    long count = queryBuilder().physical(plan).run().recordCount();
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testSelectFromSnapshotIdAndToSnapshotId() throws Exception {
+    String snapshotQuery = "select snapshot_id from dfs.tmp.`testAllTypes#snapshots` order by committed_at";
+
+    String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', fromSnapshotId => %s, toSnapshotId => %s))";
+
+    DirectRowSet rowSet = queryBuilder().sql(snapshotQuery).rowSet();
+    try {
+      RowSetReader reader = rowSet.reader();
+      assertTrue(reader.next());
+      Long fromSnapshotId = (Long) reader.column(0).reader().getObject();
+      assertTrue(reader.next());
+      Long toSnapshotId = (Long) reader.column(0).reader().getObject();
+
+      String plan = queryBuilder().sql(query, fromSnapshotId, toSnapshotId).explainJson();
+      long count = queryBuilder().physical(plan).run().recordCount();
+      assertEquals(1, count);
+    } finally {
+      rowSet.clear();
+    }
+  }
+
+  @Test
+  public void testSelectWithSnapshotIdAndSnapshotAsOfTime() throws Exception {
+    String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', snapshotId => %s, snapshotAsOfTime => %s))";
+    try {
+      queryBuilder().sql(query, 123, 456).run();
+    } catch (UserRemoteException e) {
+      MatcherAssert.assertThat(e.getVerboseMessage(), containsString("Both 'snapshotId' and 'snapshotAsOfTime' cannot be specified"));
+    }
+  }
+
+  @Test
+  public void testAllTypes() throws Exception {

Review comment:
       Yes, it supports arrays and this test also checks that it can be read correctly. Test below also checks that project pushdown for the case of selecting array indexes works correctly.

##########
File path: contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/IcebergGroupScan.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.iceberg;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import lombok.Builder;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.iceberg.format.IcebergFormatPlugin;
+import org.apache.drill.exec.store.iceberg.plan.DrillExprToIcebergTranslator;
+import org.apache.drill.exec.store.iceberg.snapshot.Snapshot;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopTables;
+
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+@Slf4j
+@JsonTypeName("iceberg-scan")
+@SuppressWarnings("unused")
+public class IcebergGroupScan extends AbstractGroupScan {
+
+  private final IcebergFormatPlugin formatPlugin;
+
+  private final String path;
+
+  private final TupleMetadata schema;
+
+  private final LogicalExpression condition;
+
+  private final DrillFileSystem fs;
+
+  private final List<SchemaPath> columns;
+
+  private int maxRecords;
+
+  private List<IcebergCompleteWork> chunks;
+
+  private TableScan tableScan;
+
+  private List<EndpointAffinity> endpointAffinities;
+
+  private ListMultimap<Integer, IcebergCompleteWork> mappings;
+
+  @JsonCreator
+  public IcebergGroupScan(
+      @JsonProperty("userName") String userName,
+      @JsonProperty("storage") StoragePluginConfig storageConfig,
+      @JsonProperty("format") FormatPluginConfig formatConfig,
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JsonProperty("schema") TupleMetadata schema,
+      @JsonProperty("path") String path,
+      @JsonProperty("condition") LogicalExpression condition,
+      @JsonProperty("maxRecords") Integer maxRecords,
+      @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this(userName,
+        pluginRegistry.resolveFormat(storageConfig, formatConfig, IcebergFormatPlugin.class),
+        schema, path, condition, columns, maxRecords);
+  }
+
+  @Builder(toBuilder = true)
+  private IcebergGroupScan(String userName, IcebergFormatPlugin formatPlugin,
+    TupleMetadata schema, String path, LogicalExpression condition, List<SchemaPath> columns, int maxRecords) {
+    super(userName);
+    this.formatPlugin = formatPlugin;
+    this.columns = columns;
+    this.path = path;
+    this.schema = schema;
+    this.condition = condition;
+    this.maxRecords = maxRecords;
+    this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
+    this.tableScan = initTableScan(formatPlugin, path, condition);
+
+    init();
+  }
+
+  public static TableScan initTableScan(IcebergFormatPlugin formatPlugin, String path, LogicalExpression condition) {
+    TableScan tableScan = new HadoopTables(formatPlugin.getFsConf()).load(path).newScan();
+    Map<String, String> properties = formatPlugin.getConfig().getProperties();
+    if (properties != null) {
+      for (Map.Entry<String, String> entry : properties.entrySet()) {
+        tableScan = tableScan.option(entry.getKey(), entry.getValue());
+      }
+    }
+    if (condition != null) {
+      Expression expression = condition.accept(
+        DrillExprToIcebergTranslator.INSTANCE, null);
+      tableScan = tableScan.filter(expression);
+    }
+    Snapshot snapshot = formatPlugin.getConfig().getSnapshot();
+    if (snapshot != null) {
+      tableScan = snapshot.apply(tableScan);
+    }
+    Boolean caseSensitive = formatPlugin.getConfig().getCaseSensitive();
+    if (caseSensitive != null) {
+      tableScan = tableScan.caseSensitive(caseSensitive);
+    }
+    Boolean includeColumnStats = formatPlugin.getConfig().getIncludeColumnStats();
+    if (includeColumnStats != null && includeColumnStats) {
+      tableScan = tableScan.includeColumnStats();
+    }
+    Boolean ignoreResiduals = formatPlugin.getConfig().getIgnoreResiduals();
+    if (ignoreResiduals != null && ignoreResiduals) {
+      tableScan = tableScan.ignoreResiduals();
+    }
+    return tableScan;
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   *
+   * @param that The IcebergGroupScan to clone
+   */
+  private IcebergGroupScan(IcebergGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.formatPlugin = that.formatPlugin;
+    this.path = that.path;
+    this.condition = that.condition;
+    this.schema = that.schema;
+    this.mappings = that.mappings;
+    this.fs = that.fs;
+    this.maxRecords = that.maxRecords;
+    this.chunks = that.chunks;
+    this.tableScan = that.tableScan;
+    this.endpointAffinities = that.endpointAffinities;
+  }
+
+  @Override
+  public IcebergGroupScan clone(List<SchemaPath> columns) {
+    return toBuilder().columns(columns).build();
+  }
+
+  @Override
+  public IcebergGroupScan applyLimit(int maxRecords) {
+    IcebergGroupScan clone = new IcebergGroupScan(this);
+    clone.maxRecords = maxRecords;
+    return clone;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    mappings = AssignmentCreator.getMappings(endpoints, chunks);
+  }
+
+  private void createMappings(List<EndpointAffinity> affinities) {
+    List<DrillbitEndpoint> endpoints = affinities.stream()
+      .map(EndpointAffinity::getEndpoint)
+      .collect(Collectors.toList());
+    applyAssignments(endpoints);
+  }
+
+  @Override
+  public IcebergSubScan getSpecificScan(int minorFragmentId) {
+    if (mappings == null) {
+      createMappings(endpointAffinities);
+    }
+    assert minorFragmentId < mappings.size() : String.format(
+      "Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(),
+      minorFragmentId);
+
+    List<IcebergCompleteWork> workList = mappings.get(minorFragmentId);
+
+    Preconditions.checkArgument(!workList.isEmpty(),
+      String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
+
+    IcebergSubScan subScan = IcebergSubScan.builder()
+      .userName(userName)
+      .formatPlugin(formatPlugin)
+      .columns(columns)
+      .condition(condition)
+      .schema(schema)
+      .workList(convertWorkList(workList))
+      .tableScan(tableScan)
+      .path(path)
+      .maxRecords(maxRecords)
+      .build();
+
+    subScan.setOperatorId(getOperatorId());
+    return subScan;
+  }
+
+  private List<IcebergWork> convertWorkList(List<IcebergCompleteWork> workList) {
+    return workList.stream()
+      .map(IcebergCompleteWork::getScanTask)
+      .map(IcebergWork::new)
+      .collect(Collectors.toList());
+  }
+
+  @JsonIgnore
+  public TableScan getTableScan() {
+    return tableScan;
+  }
+
+  @JsonProperty("maxRecords")
+  public int getMaxRecords() {
+    return maxRecords;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return chunks.size();
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    int expectedRecordsPerChunk = 1_000_000;
+    if (maxRecords >= 0) {
+      expectedRecordsPerChunk = Math.max(maxRecords, 1);
+    }
+    int estimatedRecords = chunks.size() * expectedRecordsPerChunk;
+    return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, estimatedRecords, 1, 0);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new IcebergGroupScan(this);
+  }
+
+  @SneakyThrows
+  private void init() {
+    tableScan = projectColumns(tableScan, columns);
+    chunks = new IcebergBlockMapBuilder(fs, formatPlugin.getContext().getBits())
+      .generateFileWork(tableScan.planTasks());
+    endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+  }
+
+  public static TableScan projectColumns(TableScan tableScan, List<SchemaPath> columns) {
+    boolean hasStar = columns.stream()
+      .anyMatch(SchemaPath::isDynamicStar);
+    if (!hasStar) {
+      List<String> projectColumns = columns.stream()
+        .map(IcebergGroupScan::getPath)
+        .collect(Collectors.toList());
+      return tableScan.select(projectColumns);
+    }
+    return tableScan;
+  }
+
+  public static String getPath(SchemaPath schemaPath) {
+    StringBuilder sb = new StringBuilder();
+    PathSegment segment = schemaPath.getRootSegment();
+    sb.append(segment.getNameSegment().getPath());
+
+    while ((segment = segment.getChild()) != null) {
+      sb.append('.')
+        .append(segment.isNamed()
+          ? segment.getNameSegment().getPath()
+          : "element");
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (endpointAffinities == null) {
+      logger.debug("Chunks size: {}", chunks.size());
+      endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+    }
+    return endpointAffinities;
+  }
+
+  @Override
+  public boolean supportsLimitPushdown() {
+    return false;
+  }
+
+  @Override
+  @JsonProperty("columns")
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty("schema")
+  public TupleMetadata getSchema() {
+    return schema;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getStorageConfig() {
+    return formatPlugin.getStorageConfig();
+  }
+
+  @JsonProperty("format")
+  public FormatPluginConfig getFormatConfig() {
+    return formatPlugin.getConfig();
+  }
+
+  @JsonProperty("path")
+  public String getPath() {
+    return path;
+  }
+
+  @JsonProperty("condition")
+  public LogicalExpression getCondition() {
+    return condition;
+  }
+
+  @JsonIgnore
+  public IcebergFormatPlugin getFormatPlugin() {
+    return formatPlugin;
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", IcebergGroupScan.class.getSimpleName() + "[", "]")

Review comment:
       Thanks, added.

##########
File path: contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/IcebergGroupScan.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.iceberg;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import lombok.Builder;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.iceberg.format.IcebergFormatPlugin;
+import org.apache.drill.exec.store.iceberg.plan.DrillExprToIcebergTranslator;
+import org.apache.drill.exec.store.iceberg.snapshot.Snapshot;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopTables;
+
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+@Slf4j
+@JsonTypeName("iceberg-scan")
+@SuppressWarnings("unused")
+public class IcebergGroupScan extends AbstractGroupScan {
+
+  private final IcebergFormatPlugin formatPlugin;
+
+  private final String path;
+
+  private final TupleMetadata schema;
+
+  private final LogicalExpression condition;
+
+  private final DrillFileSystem fs;
+
+  private final List<SchemaPath> columns;
+
+  private int maxRecords;
+
+  private List<IcebergCompleteWork> chunks;
+
+  private TableScan tableScan;
+
+  private List<EndpointAffinity> endpointAffinities;
+
+  private ListMultimap<Integer, IcebergCompleteWork> mappings;
+
+  @JsonCreator
+  public IcebergGroupScan(
+      @JsonProperty("userName") String userName,
+      @JsonProperty("storage") StoragePluginConfig storageConfig,
+      @JsonProperty("format") FormatPluginConfig formatConfig,
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JsonProperty("schema") TupleMetadata schema,
+      @JsonProperty("path") String path,
+      @JsonProperty("condition") LogicalExpression condition,
+      @JsonProperty("maxRecords") Integer maxRecords,
+      @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this(userName,
+        pluginRegistry.resolveFormat(storageConfig, formatConfig, IcebergFormatPlugin.class),
+        schema, path, condition, columns, maxRecords);
+  }
+
+  @Builder(toBuilder = true)
+  private IcebergGroupScan(String userName, IcebergFormatPlugin formatPlugin,
+    TupleMetadata schema, String path, LogicalExpression condition, List<SchemaPath> columns, int maxRecords) {
+    super(userName);
+    this.formatPlugin = formatPlugin;
+    this.columns = columns;
+    this.path = path;
+    this.schema = schema;
+    this.condition = condition;
+    this.maxRecords = maxRecords;
+    this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
+    this.tableScan = initTableScan(formatPlugin, path, condition);
+
+    init();
+  }
+
+  public static TableScan initTableScan(IcebergFormatPlugin formatPlugin, String path, LogicalExpression condition) {
+    TableScan tableScan = new HadoopTables(formatPlugin.getFsConf()).load(path).newScan();
+    Map<String, String> properties = formatPlugin.getConfig().getProperties();
+    if (properties != null) {
+      for (Map.Entry<String, String> entry : properties.entrySet()) {
+        tableScan = tableScan.option(entry.getKey(), entry.getValue());
+      }
+    }
+    if (condition != null) {
+      Expression expression = condition.accept(
+        DrillExprToIcebergTranslator.INSTANCE, null);
+      tableScan = tableScan.filter(expression);
+    }
+    Snapshot snapshot = formatPlugin.getConfig().getSnapshot();
+    if (snapshot != null) {
+      tableScan = snapshot.apply(tableScan);
+    }
+    Boolean caseSensitive = formatPlugin.getConfig().getCaseSensitive();
+    if (caseSensitive != null) {
+      tableScan = tableScan.caseSensitive(caseSensitive);
+    }
+    Boolean includeColumnStats = formatPlugin.getConfig().getIncludeColumnStats();
+    if (includeColumnStats != null && includeColumnStats) {
+      tableScan = tableScan.includeColumnStats();
+    }
+    Boolean ignoreResiduals = formatPlugin.getConfig().getIgnoreResiduals();
+    if (ignoreResiduals != null && ignoreResiduals) {
+      tableScan = tableScan.ignoreResiduals();
+    }
+    return tableScan;
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   *
+   * @param that The IcebergGroupScan to clone
+   */
+  private IcebergGroupScan(IcebergGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.formatPlugin = that.formatPlugin;
+    this.path = that.path;
+    this.condition = that.condition;
+    this.schema = that.schema;
+    this.mappings = that.mappings;
+    this.fs = that.fs;
+    this.maxRecords = that.maxRecords;
+    this.chunks = that.chunks;
+    this.tableScan = that.tableScan;
+    this.endpointAffinities = that.endpointAffinities;
+  }
+
+  @Override
+  public IcebergGroupScan clone(List<SchemaPath> columns) {
+    return toBuilder().columns(columns).build();
+  }
+
+  @Override
+  public IcebergGroupScan applyLimit(int maxRecords) {
+    IcebergGroupScan clone = new IcebergGroupScan(this);
+    clone.maxRecords = maxRecords;
+    return clone;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    mappings = AssignmentCreator.getMappings(endpoints, chunks);
+  }
+
+  private void createMappings(List<EndpointAffinity> affinities) {
+    List<DrillbitEndpoint> endpoints = affinities.stream()
+      .map(EndpointAffinity::getEndpoint)
+      .collect(Collectors.toList());
+    applyAssignments(endpoints);
+  }
+
+  @Override
+  public IcebergSubScan getSpecificScan(int minorFragmentId) {
+    if (mappings == null) {
+      createMappings(endpointAffinities);
+    }
+    assert minorFragmentId < mappings.size() : String.format(
+      "Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(),
+      minorFragmentId);
+
+    List<IcebergCompleteWork> workList = mappings.get(minorFragmentId);
+
+    Preconditions.checkArgument(!workList.isEmpty(),
+      String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));

Review comment:
       No, the user can't address it.

##########
File path: contrib/format-iceberg/README.md
##########
@@ -0,0 +1,99 @@
+# Apache Iceberg format plugin
+
+This format plugin enabled Drill to query Apache Iceberg tables.
+
+Unlike regular format plugins, the Iceberg table is a folder with data and metadata files, but Drill checks the presence
+of the `metadata` folder to ensure that the table is Iceberg one.
+
+Drill supports reading all formats of Iceberg tables available at this moment: Parquet, Avro, and ORC.
+No need to provide actual table format, it will be discovered automatically.
+
+For details related to Apache Iceberg table format, please refer to [official docs](https://iceberg.apache.org/#).
+
+## Supported optimizations and features
+
+### Project pushdown
+
+This format plugin supports project and filter pushdown optimizations.
+
+For the case of project pushdown, only specified in the query columns will be read, even if it is a nested column. In
+conjunction with column-oriented formats like Parquet or ORC, it allows improving reading performance significantly.
+
+### Filter pushdown
+
+For the case of filter pushdown, all expressions supported by Iceberg API will be pushed down, so only data that matches
+the filter expression will be read.
+
+### Schema provisioning
+
+This format plugin supports the schema provisioning feature. Though Iceberg provides table schema, in some cases, it
+might be useful to select data with customized schema, so it can be done using the table function:
+
+```sql
+SELECT int_field,
+       string_field
+FROM table(dfs.tmp.testAllTypes(schema => 'inline=(int_field varchar not null default `error`)'))
+```
+
+In this example, we convert int field to string and return `'error'` literals for null values.
+
+### Querying table metadata
+
+Apache Drill provides the ability to query any kind of table metadata Iceberg can return.
+
+At this point, Apache Iceberg has the following metadata kinds:
+
+* ENTRIES
+* FILES
+* HISTORY
+* SNAPSHOTS
+* MANIFESTS
+* PARTITIONS
+* ALL_DATA_FILES
+* ALL_MANIFESTS
+* ALL_ENTRIES
+
+To query specific metadata, just add the `#metadata_name` suffix to the table location, like in the following example:
+
+```sql
+SELECT *
+FROM dfs.tmp.`testAllTypes#snapshots`
+```
+
+### Querying specific table versions (snapshots)
+
+Apache Icebergs has the ability to track the table modifications and read specific version before or after modifications
+or modifications itself.
+
+This storage plugin embraces this ability and provides an easy-to-use way of triggering it.
+
+The following ways of specifying table version are supported:
+
+- `snapshotId` - id of the specific snapshot
+- `snapshotAsOfTime` - the most recent snapshot as of the given time in milliseconds
+- `fromSnapshotId` - read appended data from `fromSnapshotId` exclusive to the current snapshot inclusive
+- \[`fromSnapshotId` : `toSnapshotId`\] - read appended data from `fromSnapshotId` exclusive to `toSnapshotId` inclusive
+
+Table function can be used to specify one of the above configs in the following way:
+
+```sql
+SELECT *
+FROM table(dfs.tmp.testAllTypes(type => 'iceberg', snapshotId => 123456789));
+
+SELECT *
+FROM table(dfs.tmp.testAllTypes(type => 'iceberg', snapshotAsOfTime => 1636231332000));
+
+SELECT *
+FROM table(dfs.tmp.testAllTypes(type => 'iceberg', fromSnapshotId => 123456789));
+
+SELECT *
+FROM table(dfs.tmp.testAllTypes(type => 'iceberg', fromSnapshotId => 123456789, toSnapshotId => 987654321));
+```
+
+## Configuration
+
+Format plugin has the following configuration options:
+
+- `type` - format plugin type, should be `'iceberg'`
+- `properties` - Iceberg-specific table properties. Please refer to [Configuration](https://iceberg.apache.org/#configuration/) page for more details

Review comment:
       Good point, done

##########
File path: contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/plan/DrillExprToIcebergTranslator.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.iceberg.plan;
+
+import lombok.Value;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.exec.store.iceberg.IcebergGroupScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+public class DrillExprToIcebergTranslator extends AbstractExprVisitor<Expression, Void, RuntimeException> {
+
+  public static final ExprVisitor<Expression, Void, RuntimeException> INSTANCE = new DrillExprToIcebergTranslator();
+
+  @Override
+  public Expression visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
+    switch (call.getName()) {
+      case FunctionNames.AND: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return Expressions.and(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.OR: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return Expressions.or(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.NOT: {
+        Expression expression = call.arg(0).accept(this, null);
+        if (expression != null) {
+          return Expressions.not(expression);
+        }
+        return null;
+      }
+      case FunctionNames.IS_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = IcebergGroupScan.getPath((SchemaPath) arg);
+          return Expressions.isNull(name);
+        }
+        return null;
+      }
+      case FunctionNames.IS_NOT_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = IcebergGroupScan.getPath((SchemaPath) arg);
+          return Expressions.notNull(name);
+        }
+        return null;
+      }
+      case FunctionNames.LT: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath && expression instanceof ConstantExpression) {
+          String name = IcebergGroupScan.getPath((SchemaPath) nameRef);
+          return Expressions.lessThan(name, ((ConstantExpression<?>) expression).getValue());
+        }
+        return null;
+      }
+      case FunctionNames.LE: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath && expression instanceof ConstantExpression) {
+          String name = IcebergGroupScan.getPath((SchemaPath) nameRef);
+          return Expressions.lessThanOrEqual(name, ((ConstantExpression<?>) expression).getValue());
+        }
+        return null;
+      }
+      case FunctionNames.GT: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath && expression instanceof ConstantExpression) {
+          String name = IcebergGroupScan.getPath((SchemaPath) nameRef);
+          return Expressions.greaterThan(name, ((ConstantExpression<?>) expression).getValue());
+        }
+        return null;
+      }
+      case FunctionNames.GE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(0).accept(this, null);
+        if (nameRef instanceof SchemaPath && expression instanceof ConstantExpression) {
+          String name = IcebergGroupScan.getPath((SchemaPath) nameRef);
+          return Expressions.greaterThanOrEqual(name, ((ConstantExpression<?>) expression).getValue());
+        }
+        return null;
+      }
+      case FunctionNames.EQ: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath && expression instanceof ConstantExpression) {
+          String name = IcebergGroupScan.getPath((SchemaPath) nameRef);
+          return Expressions.equal(name, ((ConstantExpression<?>) expression).getValue());
+        }
+        return null;
+      }
+      case FunctionNames.NE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath && expression instanceof ConstantExpression) {
+          String name = IcebergGroupScan.getPath((SchemaPath) nameRef);
+          return Expressions.notEqual(name, ((ConstantExpression<?>) expression).getValue());
+        }
+        return null;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Expression visitFloatConstant(ValueExpressions.FloatExpression fExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(fExpr.getFloat());
+  }
+
+  @Override
+  public Expression visitIntConstant(ValueExpressions.IntExpression intExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(intExpr.getInt());
+  }
+
+  @Override
+  public Expression visitLongConstant(ValueExpressions.LongExpression longExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(longExpr.getLong());
+  }
+
+  @Override
+  public Expression visitDecimal9Constant(ValueExpressions.Decimal9Expression decExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(decExpr.getIntFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(decExpr.getLongFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDateConstant(ValueExpressions.DateExpression dateExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(dateExpr.getDate());
+  }
+
+  @Override
+  public Expression visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(timeExpr.getTime());
+  }
+
+  @Override
+  public Expression visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(timestampExpr.getTimeStamp());
+  }
+
+  @Override
+  public Expression visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Void value) throws RuntimeException {
+    return new ConstantExpression<>(dExpr.getDouble());
+  }
+
+  @Override
+  public Expression visitBooleanConstant(ValueExpressions.BooleanExpression e, Void value) throws RuntimeException {
+    return new ConstantExpression<>(e.getBoolean());
+  }
+
+  @Override
+  public Expression visitQuotedStringConstant(ValueExpressions.QuotedString e, Void value) throws RuntimeException {
+    return new ConstantExpression<>(e.getString());
+  }
+
+  @Override
+  public Expression visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
+    return null;
+  }
+
+  @Value
+  private static class ConstantExpression<T> implements Expression {
+

Review comment:
       Done.

##########
File path: pom.xml
##########
@@ -137,6 +137,7 @@
     <lombok.version>1.18.20</lombok.version>
     <brotli-codec.version>0.1.1</brotli-codec.version>
     <aircompressor.version>0.20</aircompressor.version>
+    <iceberg.version>0.12.0</iceberg.version>

Review comment:
       Yep, thanks for pointing to this. When the PR was created, only 0.12.0 was present, but since the newer version was released, we can use that one.

##########
File path: contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/read/IcebergColumnConverterFactory.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.iceberg.read;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.ColumnConverterFactory;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.DictColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.ValueWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class IcebergColumnConverterFactory extends ColumnConverterFactory {
+
+  public IcebergColumnConverterFactory(TupleMetadata providedSchema) {
+    super(providedSchema);
+  }
+
+  @Override
+  protected ColumnConverter getMapConverter(TupleMetadata providedSchema,
+    TupleMetadata readerSchema, TupleWriter tupleWriter) {
+    Map<String, ColumnConverter> converters = StreamSupport.stream(readerSchema.spliterator(), false)
+      .collect(Collectors.toMap(
+        ColumnMetadata::name,
+        columnMetadata ->
+          getConverter(providedSchema, columnMetadata, tupleWriter.column(columnMetadata.name()))));
+
+    return new MapColumnConverter(this, providedSchema, tupleWriter, converters);
+  }
+
+  @Override
+  public ColumnConverter.ScalarColumnConverter buildScalar(ColumnMetadata readerSchema, ValueWriter writer) {
+    switch (readerSchema.type()) {
+      case BIT:
+        return new ColumnConverter.ScalarColumnConverter(value -> writer.setBoolean((Boolean) value));
+      case TIMESTAMP:
+        return new ColumnConverter.ScalarColumnConverter(value -> {
+          Instant instant;
+          if (value instanceof LocalDateTime) {
+            LocalDateTime dateTime = (LocalDateTime) value;
+            instant = dateTime.toInstant(ZoneOffset.UTC);
+          } else {
+            instant = Instant.ofEpochMilli((Long) value / 1000);
+          }
+          writer.setTimestamp(instant);
+        });
+      case VARDECIMAL:
+        return new ColumnConverter.ScalarColumnConverter(value -> writer.setDecimal((BigDecimal) value));
+      case VARBINARY:
+        return new ColumnConverter.ScalarColumnConverter(value -> {
+          byte[] bytes;
+          if (value instanceof ByteBuffer) {
+            ByteBuffer byteBuf = (ByteBuffer) value;
+            bytes = byteBuf.array();
+          } else {
+            bytes = (byte[]) value;
+          }
+          writer.setBytes(bytes, bytes.length);
+        });
+      default:
+        return super.buildScalar(readerSchema, writer);
+    }
+  }
+
+  public static ColumnMetadata getColumnMetadata(Types.NestedField field) {
+    Type type = field.type();
+    String name = field.name();
+    return getColumnMetadata(name, type, field.isOptional() ? TypeProtos.DataMode.OPTIONAL : TypeProtos.DataMode.REQUIRED);
+  }
+
+  private static ColumnMetadata getColumnMetadata(String name, Type type, TypeProtos.DataMode dataMode) {
+    switch (type.typeId()) {
+      case MAP:
+        return getDictColumnMetadata(name, type, dataMode);
+      case STRUCT:
+        return MetadataUtils.newMap(name, dataMode, convertSchema(type.asStructType()));
+      case LIST:
+        Type elementType = type.asListType().elementType();
+        switch (elementType.typeId()) {
+          case MAP:
+            return getDictColumnMetadata(name, elementType, TypeProtos.DataMode.REPEATED);
+          case STRUCT:
+            return MetadataUtils.newMapArray(name, convertSchema(elementType.asStructType()));
+          case LIST:
+            return MetadataUtils.newRepeatedList(name, getColumnMetadata(name, elementType, TypeProtos.DataMode.REPEATED));
+          default:
+            return getPrimitiveMetadata(name, elementType, TypeProtos.DataMode.REPEATED);
+        }
+      default:
+        return getPrimitiveMetadata(name, type, dataMode);
+    }
+  }
+
+  private static ColumnMetadata getPrimitiveMetadata(String name, Type type, TypeProtos.DataMode dataMode) {
+    TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder()
+      .setMinorType(getType(type))
+      .setMode(dataMode);
+    switch (type.typeId()) {
+      case DECIMAL: {
+        Types.DecimalType decimalType = (Types.DecimalType) type;
+        builder.setScale(decimalType.scale())
+          .setPrecision(decimalType.precision());
+        break;
+      }
+      case FIXED: {
+        Types.FixedType fixedType = (Types.FixedType) type;
+        builder.setWidth(fixedType.length());
+      }
+    }
+    MaterializedField materializedField = MaterializedField.create(name, builder.build());
+    return MetadataUtils.fromField(materializedField);
+  }
+
+  private static DictColumnMetadata getDictColumnMetadata(String name, Type type, TypeProtos.DataMode dataMode) {
+    MaterializedField dictField = SchemaBuilder.columnSchema(name, TypeProtos.MinorType.DICT, dataMode);
+    TupleSchema dictSchema = new TupleSchema();
+    dictSchema.add(getColumnMetadata(DictVector.FIELD_KEY_NAME, type.asMapType().keyType(), TypeProtos.DataMode.REQUIRED));
+    dictSchema.add(getColumnMetadata(DictVector.FIELD_VALUE_NAME, type.asMapType().valueType(), TypeProtos.DataMode.REQUIRED));
+    return MetadataUtils.newDict(dictField, dictSchema);
+  }
+
+  public static TupleSchema convertSchema(Types.StructType structType) {
+    TupleSchema schema = new TupleSchema();
+    for (Types.NestedField field : structType.fields()) {
+      ColumnMetadata columnMetadata = getColumnMetadata(field);
+      schema.add(columnMetadata);
+    }
+    return schema;
+  }
+
+  private static TypeProtos.MinorType getType(Type type) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        return TypeProtos.MinorType.BIT;
+      case INTEGER:
+        return TypeProtos.MinorType.INT;
+      case LONG:
+        return TypeProtos.MinorType.BIGINT;
+      case FLOAT:
+        return TypeProtos.MinorType.FLOAT4;
+      case DOUBLE:
+        return TypeProtos.MinorType.FLOAT8;
+      case DATE:
+        return TypeProtos.MinorType.DATE;
+      case TIME:
+        return TypeProtos.MinorType.TIME;
+      case TIMESTAMP:
+        return TypeProtos.MinorType.TIMESTAMP;
+      case STRING:
+        return TypeProtos.MinorType.VARCHAR;
+      case UUID:
+      case FIXED:
+      case BINARY:
+        return TypeProtos.MinorType.VARBINARY;
+      case DECIMAL:
+        return TypeProtos.MinorType.VARDECIMAL;
+    }
+    throw new UnsupportedOperationException("Unsupported type: " + type);

Review comment:
       Done.

##########
File path: contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
##########
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.iceberg;
+
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.iceberg.format.IcebergFormatPluginConfig;
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.Text;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.types.Types;
+import org.hamcrest.MatcherAssert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.apache.drill.test.TestBuilder.listOf;
+import static org.apache.drill.test.TestBuilder.mapOf;
+import static org.apache.drill.test.TestBuilder.mapOfObject;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IcebergQueriesTest extends ClusterTest {
+  private static Table table;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+    FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+    Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats());
+    formats.put("iceberg", IcebergFormatPluginConfig.builder().build());
+    FileSystemConfig newPluginConfig = new FileSystemConfig(
+      pluginConfig.getConnection(),
+      pluginConfig.getConfig(),
+      pluginConfig.getWorkspaces(),
+      formats,
+      PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
+    pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+    Configuration config = new Configuration();
+    config.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+
+    HadoopTables tables = new HadoopTables(config);
+    Schema structSchema = new Schema(
+      Types.NestedField.optional(13, "struct_int_field", Types.IntegerType.get()),
+      Types.NestedField.optional(14, "struct_string_field", Types.StringType.get())
+    );
+    Types.ListType repeatedStructType =  Types.ListType.ofOptional(
+      16, Types.StructType.of(
+      Types.NestedField.optional(17, "struct_int_field", Types.IntegerType.get()),
+      Types.NestedField.optional(18, "struct_string_field", Types.StringType.get())
+    ));
+    Schema schema = new Schema(
+      Types.NestedField.optional(1, "int_field", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "long_field", Types.LongType.get()),
+      Types.NestedField.optional(3, "float_field", Types.FloatType.get()),
+      Types.NestedField.optional(4, "double_field", Types.DoubleType.get()),
+      Types.NestedField.optional(5, "string_field", Types.StringType.get()),
+      Types.NestedField.optional(6, "boolean_field", Types.BooleanType.get()),
+      Types.NestedField.optional(26, "time_field", Types.TimeType.get()),
+      Types.NestedField.optional(27, "timestamp_field", Types.TimestampType.withoutZone()),
+      Types.NestedField.optional(28, "date_field", Types.DateType.get()),
+      Types.NestedField.optional(29, "decimal_field", Types.DecimalType.of(4, 2)),
+      Types.NestedField.optional(30, "uuid_field", Types.UUIDType.get()),
+      Types.NestedField.optional(31, "fixed_field", Types.FixedType.ofLength(10)),
+      Types.NestedField.optional(32, "binary_field", Types.BinaryType.get()),
+      Types.NestedField.optional(7, "list_field", Types.ListType.ofOptional(
+        10, Types.StringType.get())),
+      Types.NestedField.optional(8, "map_field", Types.MapType.ofOptional(
+        11, 12, Types.StringType.get(), Types.FloatType.get())),
+      Types.NestedField.required(9, "struct_field", structSchema.asStruct()),
+      Types.NestedField.required(15, "repeated_struct_field", repeatedStructType),
+      Types.NestedField.required(19, "repeated_list_field", Types.ListType.ofOptional(
+        20, Types.ListType.ofOptional(21, Types.StringType.get()))),
+      Types.NestedField.optional(22, "repeated_map_field", Types.ListType.ofOptional(
+        23, Types.MapType.ofOptional(24, 25, Types.StringType.get(), Types.FloatType.get())))
+    );
+
+    List<String> listValue = Arrays.asList("a", "b", "c");
+
+    Map<String, Float> mapValue = new HashMap<>();
+    mapValue.put("a", 0.1F);
+    mapValue.put("b", 0.2F);
+
+    Map<String, Float> secondMapValue = new HashMap<>();
+    secondMapValue.put("true", 1F);
+    secondMapValue.put("false", 0F);
+
+    Record structValue = GenericRecord.create(structSchema);
+    structValue.setField("struct_int_field", 123);
+    structValue.setField("struct_string_field", "abc");
+
+    Record secondStructValue = GenericRecord.create(structSchema);
+    secondStructValue.setField("struct_int_field", 321);
+    secondStructValue.setField("struct_string_field", "def");
+
+    Record record = GenericRecord.create(schema);
+    record.setField("int_field", 1);
+    record.setField("long_field", 100L);
+    record.setField("float_field", 0.5F);
+    record.setField("double_field", 1.5D);
+    record.setField("string_field", "abc");
+    record.setField("boolean_field", true);
+    record.setField("time_field", LocalTime.of(2, 42, 42));
+    record.setField("timestamp_field", LocalDateTime.of(1994, 4, 18, 11, 0, 0));
+    record.setField("date_field", LocalDate.of(1994, 4, 18));
+    record.setField("decimal_field", new BigDecimal("12.34"));
+    record.setField("uuid_field", new byte[16]);
+    record.setField("fixed_field", new byte[10]);
+    record.setField("binary_field", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)));
+    record.setField("list_field", listValue);
+    record.setField("map_field", mapValue);
+    record.setField("struct_field", structValue);
+    record.setField("repeated_struct_field", Arrays.asList(structValue, structValue));
+    record.setField("repeated_list_field", Arrays.asList(listValue, listValue));
+    record.setField("repeated_map_field", Arrays.asList(mapValue, mapValue));
+
+    Record nullsRecord = GenericRecord.create(schema);
+    nullsRecord.setField("int_field", null);
+    nullsRecord.setField("long_field", null);
+    nullsRecord.setField("float_field", null);
+    nullsRecord.setField("double_field", null);
+    nullsRecord.setField("string_field", null);
+    nullsRecord.setField("boolean_field", null);
+    nullsRecord.setField("time_field", null);
+    nullsRecord.setField("timestamp_field", null);
+    nullsRecord.setField("date_field", null);
+    nullsRecord.setField("decimal_field", null);
+    nullsRecord.setField("uuid_field", null);
+    nullsRecord.setField("fixed_field", null);
+    nullsRecord.setField("binary_field", null);
+    nullsRecord.setField("list_field", null);
+    nullsRecord.setField("map_field", null);
+    nullsRecord.setField("struct_field", GenericRecord.create(structSchema));
+    nullsRecord.setField("repeated_struct_field", Collections.emptyList());
+    nullsRecord.setField("repeated_list_field", Collections.emptyList());
+    nullsRecord.setField("repeated_map_field", Collections.emptyList());
+
+    Record secondRecord = GenericRecord.create(schema);
+    secondRecord.setField("int_field", 988);
+    secondRecord.setField("long_field", 543L);
+    secondRecord.setField("float_field", Float.NaN);
+    secondRecord.setField("double_field", Double.MAX_VALUE);
+    secondRecord.setField("string_field", "def");
+    secondRecord.setField("boolean_field", false);
+    secondRecord.setField("time_field", LocalTime.of(3, 41, 53));
+    secondRecord.setField("timestamp_field", LocalDateTime.of(1995, 9, 10, 9, 0, 0));
+    secondRecord.setField("date_field", LocalDate.of(1995, 9, 10));
+    secondRecord.setField("decimal_field", new BigDecimal("99.99"));
+    secondRecord.setField("uuid_field", new byte[16]);
+    secondRecord.setField("fixed_field", new byte[10]);
+    secondRecord.setField("binary_field", ByteBuffer.wrap("world".getBytes(StandardCharsets.UTF_8)));
+    secondRecord.setField("list_field", Arrays.asList("y", "n"));
+    secondRecord.setField("map_field", secondMapValue);
+    secondRecord.setField("struct_field", secondStructValue);
+    secondRecord.setField("repeated_struct_field", Arrays.asList(structValue, secondStructValue));
+    secondRecord.setField("repeated_list_field", Arrays.asList(listValue, Arrays.asList("y", "n")));
+    secondRecord.setField("repeated_map_field", Arrays.asList(mapValue, secondMapValue));
+
+    String location = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypes").toUri().getPath();
+    table = tables.create(schema, location);
+
+    writeParquetAndCommitDataFile(table, "allTypes", Arrays.asList(record, nullsRecord));
+    writeParquetAndCommitDataFile(table, "allTypes_1", Collections.singleton(secondRecord));
+
+    String avroLocation = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypesAvro").toUri().getPath();
+    writeAndCommitDataFile(tables.create(structSchema, avroLocation), "allTypes", FileFormat.AVRO,
+      Arrays.asList(structValue, GenericRecord.create(structSchema), secondStructValue));
+
+    String orcLocation = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypesOrc").toUri().getPath();
+    writeAndCommitDataFile(tables.create(structSchema, orcLocation), "allTypes", FileFormat.ORC,
+      Arrays.asList(structValue, GenericRecord.create(structSchema), secondStructValue));
+
+    String emptyTableLocation = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypesEmpty").toUri().getPath();
+    tables.create(structSchema, emptyTableLocation);
+  }
+
+  private static void writeParquetAndCommitDataFile(Table table, String name, Iterable<Record> records) throws IOException {
+    writeAndCommitDataFile(table, name, FileFormat.PARQUET, records);
+  }
+
+  private static void writeAndCommitDataFile(Table table, String name, FileFormat fileFormat, Iterable<Record> records) throws IOException {
+    OutputFile outputFile = table.io().newOutputFile(
+      new Path(table.location(), fileFormat.addExtension(name)).toUri().getPath());
+
+    FileAppender<Record> fileAppender = new GenericAppenderFactory(table.schema())
+      .newAppender(outputFile, fileFormat);
+    fileAppender.addAll(records);
+    fileAppender.close();
+
+    DataFile dataFile = DataFiles.builder(table.spec())
+      .withInputFile(outputFile.toInputFile())
+      .withMetrics(fileAppender.metrics())
+      .build();
+
+    Transaction transaction = table.newTransaction();
+    transaction.newAppend()
+      .appendFile(dataFile)
+      .commit();
+    transaction.commitTransaction();
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String plan = queryBuilder().sql("select * from dfs.tmp.testAllTypes").explainJson();
+    queryBuilder().physical(plan).run();

Review comment:
       Added. Basically, it checks that there wouldn't be any issues when obtaining a plan for the query and submitting it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org