You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2020/03/30 20:42:13 UTC

[drill] branch master updated (f1ccdc2 -> 6d98c12)

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from f1ccdc2  DRILL-7665: Add UNION to schema parser
     new e27ef55  DRILL-6604: Upgrade Drill Hive client to Hive3.1 version
     new 4efa1f9  DRILL-7641: Convert Excel Reader to use Streaming Reader
     new 9173b0f  DRILL-7672: Make metadata type required when reading from / writing into Drill Metastore
     new 6d98c12  DRILL-7673: View set query fails with NPE for non-existing option

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 contrib/format-excel/pom.xml                       |   5 +
 .../drill/exec/store/excel/ExcelBatchReader.java   | 300 ++++++++++-----------
 .../drill/exec/store/excel/TestExcelFormat.java    |  16 +-
 contrib/storage-hive/core/pom.xml                  |  30 ++-
 .../storage-hive/core/src/main/codegen/config.fmpp |   3 +-
 .../main/codegen/{config.fmpp => configHive3.fmpp} |   3 +-
 .../core/src/main/codegen/data/Hive2DateTypes.tdd  |  21 +-
 .../core/src/main/codegen/data/Hive3DateTypes.tdd  |  33 +--
 .../core/src/main/codegen/data/HiveTypes.tdd       |  14 -
 .../codegen/templates/ObjectInspectorHelper.java   |  24 +-
 .../main/codegen/templates/ObjectInspectors.java   |  37 ++-
 .../exec/store/hive/HiveMetadataProvider.java      |   4 +-
 .../drill/exec/store/hive/HiveUtilities.java       |  39 +--
 .../client/DrillHiveMetaStoreClientFactory.java    |  36 ++-
 .../hive/writers/primitive/HiveDateWriter.java     |  16 +-
 .../writers/primitive/HiveTimestampWriter.java     |  16 +-
 .../org/apache/logging/log4j/util/Strings.java}    |  17 +-
 .../apache/drill/exec/hive/HiveTestFixture.java    |   2 +
 .../apache/drill/exec/hive/HiveTestUtilities.java  |  31 +--
 .../hive/BaseTestHiveImpersonation.java            |  28 +-
 .../exec/store/hive/HiveTestDataGenerator.java     |   6 +-
 .../analyze/FileMetadataInfoCollector.java         |   5 +-
 .../impl/metadata/MetadataControllerBatch.java     |   9 +-
 .../MetastoreDropTableMetadataHandler.java         |   9 +-
 .../planner/sql/handlers/SetOptionHandler.java     |   6 +-
 .../drill/exec/store/ischema/RecordCollector.java  |  31 +--
 .../planner/sql/handlers/SetOptionHandlerTest.java |  27 +-
 .../drill/exec/sql/TestMetastoreCommands.java      |  18 +-
 exec/rpc/pom.xml                                   |  10 +-
 metastore/iceberg-metastore/pom.xml                |   8 +-
 .../iceberg/components/tables/IcebergTables.java   |  19 +-
 .../iceberg/components/tables/TableKey.java        |  11 +-
 .../tables/TablesOperationTransformer.java         |   5 +-
 .../metastore/iceberg/operate/IcebergModify.java   |  49 ++--
 .../metastore/iceberg/operate/IcebergRead.java     |  43 ++-
 .../iceberg/schema/IcebergTableSchema.java         |  16 +-
 .../iceberg/transform/FilterExpressionVisitor.java |  29 +-
 .../iceberg/transform/FilterTransformer.java       |  36 ++-
 .../iceberg/transform/InputDataTransformer.java    |   1 -
 .../iceberg/transform/OperationTransformer.java    |  14 +-
 .../components/tables/TestBasicRequests.java       |  11 +-
 .../tables/TestIcebergTablesMetastore.java         |  44 ++-
 .../iceberg/components/tables/TestTableKey.java    |   9 +-
 .../tables/TestTablesOperationTransformer.java     |  25 +-
 .../iceberg/schema/TestIcebergTableSchema.java     | 141 +++++-----
 .../iceberg/transform/TestFilterTransformer.java   | 175 ++++++++----
 metastore/metastore-api/README.md                  |  71 +++--
 .../apache/drill/metastore/MetastoreColumn.java    |  77 ++++++
 .../drill/metastore/MetastoreFieldDefinition.java  |   7 +
 .../components/tables/BasicTablesRequests.java     | 130 ++++-----
 .../components/tables/TableMetadataUnit.java       | 123 ++++-----
 .../tables/TablesMetadataTypeValidator.java}       |  39 +--
 .../metastore/expressions/FilterExpression.java    |  50 ++--
 .../drill/metastore/expressions/IsPredicate.java   |  22 +-
 .../drill/metastore/expressions/ListPredicate.java |  24 +-
 .../metastore/expressions/SimplePredicate.java     |  40 +--
 .../drill/metastore/metadata/MetadataInfo.java     |   3 -
 .../apache/drill/metastore/metadata/TableInfo.java |  11 +-
 .../drill/metastore/operate/AbstractModify.java    |  54 ++++
 .../drill/metastore/operate/AbstractRead.java      |  71 +++++
 .../org/apache/drill/metastore/operate/Delete.java |  88 ++++++
 .../metastore/operate/MetadataTypeValidator.java   |  60 +++++
 .../org/apache/drill/metastore/operate/Modify.java |  31 ++-
 .../org/apache/drill/metastore/operate/Read.java   |  22 +-
 .../components/tables/TestBasicTablesRequests.java |  41 ++-
 .../tables/TestTablesMetadataTypeValidator.java    |  76 ++++++
 pom.xml                                            |  15 +-
 67 files changed, 1657 insertions(+), 830 deletions(-)
 copy contrib/storage-hive/core/src/main/codegen/{config.fmpp => configHive3.fmpp} (89%)
 copy exec/java-exec/src/main/codegen/data/DecimalAggrTypes2.tdd => contrib/storage-hive/core/src/main/codegen/data/Hive2DateTypes.tdd (61%)
 copy exec/java-exec/src/test/resources/drill-oom-xsort.conf => contrib/storage-hive/core/src/main/codegen/data/Hive3DateTypes.tdd (59%)
 copy contrib/{storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java => storage-hive/core/src/main/java/org/apache/logging/log4j/util/Strings.java} (67%)
 create mode 100644 metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreColumn.java
 copy metastore/metastore-api/src/main/java/org/apache/drill/metastore/{MetastoreFieldDefinition.java => components/tables/TablesMetadataTypeValidator.java} (51%)
 create mode 100644 metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractModify.java
 create mode 100644 metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractRead.java
 create mode 100644 metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Delete.java
 create mode 100644 metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/MetadataTypeValidator.java
 create mode 100644 metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTablesMetadataTypeValidator.java


[drill] 02/04: DRILL-7641: Convert Excel Reader to use Streaming Reader

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 4efa1f981618173dcbc5ba78e109e1f4c9776b1b
Author: Charles Givre <cg...@apache.org>
AuthorDate: Fri Mar 27 09:12:45 2020 -0400

    DRILL-7641: Convert Excel Reader to use Streaming Reader
    
    closes #2024
---
 contrib/format-excel/pom.xml                       |   5 +
 .../drill/exec/store/excel/ExcelBatchReader.java   | 300 ++++++++++-----------
 .../drill/exec/store/excel/TestExcelFormat.java    |  16 +-
 3 files changed, 158 insertions(+), 163 deletions(-)

diff --git a/contrib/format-excel/pom.xml b/contrib/format-excel/pom.xml
index ca6b6ab..c8be2fe 100644
--- a/contrib/format-excel/pom.xml
+++ b/contrib/format-excel/pom.xml
@@ -64,6 +64,11 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.github.pjfanning</groupId>
+      <artifactId>excel-streaming-reader</artifactId>
+      <version>2.3.2</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
index 32c062d..88d124b 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.drill.exec.store.excel;
 
+import com.github.pjfanning.xlsx.StreamingReader;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -28,19 +29,15 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.poi.ss.usermodel.Cell;
 import org.apache.poi.ss.usermodel.CellType;
-import org.apache.poi.ss.usermodel.CellValue;
 import org.apache.poi.ss.usermodel.DateUtil;
-import org.apache.poi.ss.usermodel.FormulaEvaluator;
 import org.apache.poi.ss.usermodel.Row;
-import org.apache.poi.xssf.usermodel.XSSFRow;
-import org.apache.poi.xssf.usermodel.XSSFSheet;
-import org.apache.poi.xssf.usermodel.XSSFWorkbook;
+import org.apache.poi.ss.usermodel.Sheet;
+import org.apache.poi.ss.usermodel.Workbook;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -50,6 +47,8 @@ import java.util.Date;
 import java.util.Iterator;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.TimeZone;
 
 public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
@@ -66,23 +65,25 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   private static final String MISSING_FIELD_NAME_HEADER = "field_";
 
-  private final ExcelReaderConfig readerConfig;
+  private static final int ROW_CACHE_SIZE = 100;
+
+  private static final int BUFFER_SIZE = 4096;
 
-  private XSSFSheet sheet;
+  private final ExcelReaderConfig readerConfig;
 
-  private XSSFWorkbook workbook;
+  private Sheet sheet;
 
-  private InputStream fsStream;
+  private Row currentRow;
 
-  private FormulaEvaluator evaluator;
+  private Workbook workbook;
 
-  private ArrayList<String> excelFieldNames;
+  private InputStream fsStream;
 
-  private ArrayList<ScalarWriter> columnWriters;
+  private List<String> excelFieldNames;
 
-  private ArrayList<CellType> cellTypes;
+  private List<ScalarWriter> columnWriters;
 
-  private ArrayList<CellWriter> cellWriterArray;
+  private List<CellWriter> cellWriterArray;
 
   private Iterator<Row> rowIterator;
 
@@ -90,14 +91,10 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   private int totalColumnCount;
 
-  private int lineCount;
-
   private boolean firstLine;
 
   private FileSplit split;
 
-  private ResultSetLoader loader;
-
   private int recordCount;
 
   static class ExcelReaderConfig {
@@ -134,92 +131,104 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
   @Override
   public boolean open(FileSchemaNegotiator negotiator) {
     split = negotiator.split();
-    loader = negotiator.build();
+    ResultSetLoader loader = negotiator.build();
     rowWriter = loader.writer();
     openFile(negotiator);
     defineSchema();
     return true;
   }
 
+  /**
+   * This method opens the Excel file, initializes the Streaming Excel Reader, and initializes the sheet variable.
+   * @param negotiator The Drill file negotiator object that represents the file system
+   */
   private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
     try {
       fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
-      workbook = new XSSFWorkbook(fsStream);
+
+      // Open streaming reader
+      workbook = StreamingReader.builder()
+        .rowCacheSize(ROW_CACHE_SIZE)
+        .bufferSize(BUFFER_SIZE)
+        .open(fsStream);
     } catch (Exception e) {
       throw UserException
         .dataReadError(e)
         .message("Failed to open open input file: %s", split.getPath().toString())
-        .message(e.getMessage())
+        .addContext(e.getMessage())
         .build(logger);
     }
-
-    // Evaluate formulae
-    evaluator = workbook.getCreationHelper().createFormulaEvaluator();
-
-    workbook.setMissingCellPolicy(Row.MissingCellPolicy.CREATE_NULL_AS_BLANK);
     sheet = getSheet();
   }
 
   /**
    * This function defines the schema from the header row.
-   * @return TupleMedata of the discovered schema
    */
-  private TupleMetadata defineSchema() {
+  private void defineSchema() {
     SchemaBuilder builder = new SchemaBuilder();
-    return getColumnHeaders(builder);
+    getColumnHeaders(builder);
   }
 
-  private TupleMetadata getColumnHeaders(SchemaBuilder builder) {
+  private void getColumnHeaders(SchemaBuilder builder) {
     //Get the field names
-    int columnCount = 0;
+    int columnCount;
 
-    // Case for empty sheet.
-    if (sheet.getFirstRowNum() == 0 && sheet.getLastRowNum() == 0) {
-      return builder.buildSchema();
+    // Case for empty sheet
+    if (sheet.getLastRowNum() == 0) {
+      builder.buildSchema();
+      return;
     }
 
+    rowIterator = sheet.iterator();
+
     // Get the number of columns.
     columnCount = getColumnCount();
 
-    excelFieldNames = new ArrayList<>(columnCount);
-    cellWriterArray = new ArrayList<>(columnCount);
-    rowIterator = sheet.iterator();
+    excelFieldNames = new ArrayList<>();
+    cellWriterArray = new ArrayList<>();
 
     //If there are no headers, create columns names of field_n
     if (readerConfig.headerRow == -1) {
       String missingFieldName;
-      for (int i = 0; i < columnCount; i++) {
+      int i = 0;
+
+      for (Cell c : currentRow) {
         missingFieldName = MISSING_FIELD_NAME_HEADER + (i + 1);
         makeColumn(builder, missingFieldName, TypeProtos.MinorType.VARCHAR);
         excelFieldNames.add(i, missingFieldName);
+        i++;
       }
-      columnWriters = new ArrayList<>(excelFieldNames.size());
-      cellTypes = new ArrayList<>(excelFieldNames.size());
+      columnWriters = new ArrayList<>(columnCount);
 
-      return builder.buildSchema();
+      builder.buildSchema();
     } else if (rowIterator.hasNext()) {
-      //Find the header row
-      int firstHeaderRow = getFirstHeaderRow();
-
-      while (lineCount < firstHeaderRow) {
-        Row row = rowIterator.next();
-        lineCount++;
-      }
       //Get the header row and column count
-      Row row = rowIterator.next();
-      totalColumnCount = row.getLastCellNum();
-      cellTypes = new ArrayList<>(totalColumnCount);
+      totalColumnCount = currentRow.getLastCellNum();
+      Cell dataCell = null;
 
       //Read the header row
-      Iterator<Cell> cellIterator = row.cellIterator();
+      Iterator<Cell> headerRowIterator = currentRow.cellIterator();
       int colPosition = 0;
-      String tempColumnName = "";
+      String tempColumnName;
 
-      while (cellIterator.hasNext()) {
-        Cell cell = cellIterator.next();
+      // Get the first data row.
+      currentRow = rowIterator.next();
+      Row firstDataRow = currentRow;
+      Iterator<Cell> dataRowIterator = firstDataRow.cellIterator();
+
+
+      while (headerRowIterator.hasNext()) {
+        // We need this to get the header names
+        Cell cell = headerRowIterator.next();
+
+        // Since header names are most likely all Strings, we need the first row of actual data to get the data types
+        try {
+          dataCell = dataRowIterator.next();
+        } catch (NoSuchElementException e) {
+          // Do nothing... empty value in data cell
+        }
 
-        CellValue cellValue = evaluator.evaluate(cell);
-        switch (cellValue.getCellType()) {
+        switch (dataCell.getCellType()) {
           case STRING:
             tempColumnName = cell.getStringCellValue()
               .replace(PARSER_WILDCARD, SAFE_WILDCARD)
@@ -227,28 +236,26 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
               .replaceAll("\\n", HEADER_NEW_LINE_REPLACEMENT);
             makeColumn(builder, tempColumnName, TypeProtos.MinorType.VARCHAR);
             excelFieldNames.add(colPosition, tempColumnName);
-            cellTypes.add(CellType.STRING);
             break;
-          case NUMERIC:
-            tempColumnName = String.valueOf(cell.getNumericCellValue());
+          case FORMULA:
+            case NUMERIC:
+            tempColumnName = cell.getStringCellValue();
             makeColumn(builder, tempColumnName, TypeProtos.MinorType.FLOAT8);
             excelFieldNames.add(colPosition, tempColumnName);
-            cellTypes.add(CellType.NUMERIC);
             break;
         }
         colPosition++;
       }
     }
-    columnWriters = new ArrayList<>(excelFieldNames.size());
-    return builder.buildSchema();
+    columnWriters = new ArrayList<>();
+    builder.buildSchema();
   }
 
   /**
    * Helper function to get the selected sheet from the configuration
-   *
-   * @return XSSFSheet The selected sheet
+   * @return Sheet The selected sheet
    */
-  private XSSFSheet getSheet() {
+  private Sheet getSheet() {
     int sheetIndex = 0;
     if (!readerConfig.sheetName.isEmpty()) {
       sheetIndex = workbook.getSheetIndex(readerConfig.sheetName);
@@ -267,14 +274,21 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   /**
    * Returns the column count.  There are a few gotchas here in that we have to know the header row and count the physical number of cells
-   * in that row.  Since the user can define the header row,
+   * in that row.  This function also has to move the rowIterator object to the first row of data.
    * @return The number of actual columns
    */
   private int getColumnCount() {
+    // Initialize
+    currentRow = rowIterator.next();
     int rowNumber = readerConfig.headerRow > 0 ? sheet.getFirstRowNum() : 0;
-    XSSFRow sheetRow = sheet.getRow(rowNumber);
 
-    return sheetRow != null ? sheetRow.getPhysicalNumberOfCells() : 0;
+    // If the headerRow is greater than zero, advance the iterator to the first row of data
+    // This is unfortunately necessary since the streaming reader eliminated the getRow() method.
+    for (int i = 1; i < rowNumber; i++) {
+      currentRow = rowIterator.next();
+    }
+
+    return currentRow.getPhysicalNumberOfCells();
   }
 
   @Override
@@ -289,83 +303,78 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
   }
 
   private boolean nextLine(RowSetLoader rowWriter) {
-    if( sheet.getFirstRowNum() == 0 && sheet.getLastRowNum() == 0) {
+    if (sheet.getLastRowNum() == 0) {
       // Case for empty sheet
       return false;
-    } else if (!rowIterator.hasNext()) {
-      return false;
     } else if (recordCount >= readerConfig.lastRow) {
       return false;
     }
 
-    int lastRow = readerConfig.lastRow;
-    if (recordCount < lastRow && rowIterator.hasNext()) {
-      lineCount++;
-
-      Row row = rowIterator.next();
-
-      // If the user specified that there are no headers, get the column count
-      if (readerConfig.headerRow == -1 && recordCount == 0) {
-        this.totalColumnCount = row.getLastCellNum();
-      }
+    // If the user specified that there are no headers, get the column count
+    if (readerConfig.headerRow == -1 && recordCount == 0) {
+      totalColumnCount = currentRow.getLastCellNum();
+    }
 
-      int colPosition = 0;
-      if (readerConfig.firstColumn != 0) {
-        colPosition = readerConfig.firstColumn - 1;
-      }
+    int colPosition = 0;
+    if (readerConfig.firstColumn != 0) {
+      colPosition = readerConfig.firstColumn - 1;
+    }
 
-      int finalColumn = totalColumnCount;
-      if (readerConfig.lastColumn != 0) {
-        finalColumn = readerConfig.lastColumn - 1;
-      }
-      rowWriter.start();
-      for (int colWriterIndex = 0; colPosition < finalColumn; colPosition++) {
-        Cell cell = row.getCell(colPosition);
+    int finalColumn = totalColumnCount;
+    if (readerConfig.lastColumn != 0) {
+      finalColumn = readerConfig.lastColumn - 1;
+    }
+    rowWriter.start();
+    for (int colWriterIndex = 0; colPosition < finalColumn; colWriterIndex++) {
+      Cell cell = currentRow.getCell(colPosition);
 
-        CellValue cellValue = evaluator.evaluate(cell);
+      populateColumnArray(cell, colPosition);
+      cellWriterArray.get(colWriterIndex).load(cell);
 
-        populateColumnArray(cell, cellValue, colPosition);
-        cellWriterArray.get(colWriterIndex).load(cell);
+      colPosition++;
+    }
 
-        colWriterIndex++;
-      }
+    if (firstLine) {
+      firstLine = false;
+    }
+    rowWriter.save();
+    recordCount++;
 
-      if (firstLine) {
-        firstLine = false;
-      }
-      rowWriter.save();
-      recordCount++;
-      return true;
-    } else {
+    if (!rowIterator.hasNext()) {
       return false;
+    } else {
+      currentRow = rowIterator.next();
+      return true;
     }
-
   }
 
   /**
    * Function to populate the column array
    * @param cell The input cell object
-   * @param cellValue The cell value
    * @param colPosition The index of the column
    */
-  private void populateColumnArray(Cell cell, CellValue cellValue, int colPosition) {
+  private void populateColumnArray(Cell cell, int colPosition) {
     if (!firstLine) {
       return;
     }
 
-    if (cellValue == null) {
+    // Case for empty data cell in first row.  In this case, fall back to string.
+    if (cell == null) {
+      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR);
+      return;
+    }
+
+    CellType cellType = cell.getCellType();
+    if (cellType == CellType.STRING || readerConfig.allTextMode) {
       addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR);
+    } else if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) {
+      // Case if the column is a date or time
+      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.TIMESTAMP);
+    } else if (cellType == CellType.NUMERIC || cellType == CellType.FORMULA) {
+      // Case if the column is numeric
+      addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8);
     } else {
-      CellType cellType = cellValue.getCellType();
-      if (cellType == CellType.STRING || readerConfig.allTextMode) {
-        addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.VARCHAR);
-      } else if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) {
-        addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.TIMESTAMP);
-      } else if (cellType == CellType.NUMERIC) {
-        addColumnToArray(rowWriter, excelFieldNames.get(colPosition), MinorType.FLOAT8);
-      } else {
-        logger.warn("Unknown data type. Drill only supports reading NUMERIC and STRING.");
-      }
+      logger.warn("Unknown data type. Drill only supports reading NUMERIC and STRING.");
     }
   }
 
@@ -411,28 +420,13 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
-  /**
-   * Returns the index of the first row of actual data.  This function is to be used to find the header row as the POI skips blank rows.
-   * @return The headerRow index, corrected for blank rows
-   */
-  private int getFirstHeaderRow() {
-    int firstRow = sheet.getFirstRowNum();
-    int headerRow = readerConfig.headerRow;
-
-    if (headerRow < 0) {
-      return firstRow;
-    } else {
-      return headerRow;
-    }
-  }
-
   @Override
   public void close() {
     if (workbook != null) {
       try {
         workbook.close();
       } catch (IOException e) {
-        logger.warn("Error when closing XSSFWorkbook resource: {}", e.getMessage());
+        logger.warn("Error when closing Excel Workbook resource: {}", e.getMessage());
       }
       workbook = null;
     }
@@ -441,13 +435,13 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
       try {
         fsStream.close();
       } catch (IOException e) {
-        logger.warn("Error when closing XSSFWorkbook resource: {}", e.getMessage());
+        logger.warn("Error when closing Excel File Stream resource: {}", e.getMessage());
       }
       fsStream = null;
     }
   }
 
-  public class CellWriter {
+  public static class CellWriter {
     ScalarWriter columnWriter;
 
     CellWriter(ScalarWriter columnWriter) {
@@ -463,11 +457,10 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
 
     public void load(Cell cell) {
-      CellValue cellValue = evaluator.evaluate(cell);
-      if (cellValue == null) {
+      if (cell == null) {
         columnWriter.setNull();
       } else {
-        String fieldValue = cellValue.getStringValue();
+        String fieldValue = cell.getStringCellValue();
         if (fieldValue == null && readerConfig.allTextMode) {
           fieldValue = String.valueOf(cell.getNumericCellValue());
         }
@@ -476,52 +469,47 @@ public class ExcelBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
-  public class NumericStringWriter extends ExcelBatchReader.CellWriter {
+  public static class NumericStringWriter extends ExcelBatchReader.CellWriter {
     NumericStringWriter(ScalarWriter columnWriter) {
       super(columnWriter);
     }
 
     public void load(Cell cell) {
-      String fieldValue = String.valueOf(cell.getNumericCellValue());
-
-      if (fieldValue == null) {
+      if (cell == null) {
         columnWriter.setNull();
       } else {
+        String fieldValue = String.valueOf(cell.getNumericCellValue());
         columnWriter.setString(fieldValue);
       }
     }
   }
 
-  public class NumericCellWriter extends ExcelBatchReader.CellWriter {
+  public static class NumericCellWriter extends ExcelBatchReader.CellWriter {
     NumericCellWriter(ScalarWriter columnWriter) {
       super(columnWriter);
     }
 
     public void load(Cell cell) {
-      CellValue cellValue = evaluator.evaluate(cell);
-      if (cellValue == null) {
+      if (cell == null) {
         columnWriter.setNull();
       } else {
-        double fieldNumValue = cellValue.getNumberValue();
+        double fieldNumValue = cell.getNumericCellValue();
         columnWriter.setDouble(fieldNumValue);
       }
     }
   }
 
-  public class TimestampCellWriter extends ExcelBatchReader.CellWriter {
+  public static class TimestampCellWriter extends ExcelBatchReader.CellWriter {
     TimestampCellWriter(ScalarWriter columnWriter) {
       super(columnWriter);
     }
 
     public void load(Cell cell) {
-      CellValue cellValue = evaluator.evaluate(cell);
-
-      if (cellValue == null) {
+      if (cell == null) {
         columnWriter.setNull();
       } else {
-        logger.debug("Cell value: {}", cellValue.getNumberValue());
-
-        Date dt = DateUtil.getJavaDate(cellValue.getNumberValue(), TimeZone.getTimeZone("UTC"));
+        logger.debug("Cell value: {}", cell.getNumericCellValue());
+        Date dt = DateUtil.getJavaDate(cell.getNumericCellValue(), TimeZone.getTimeZone("UTC"));
         Instant timeStamp = new Instant(dt.toInstant().getEpochSecond() * 1000);
         columnWriter.setTimestamp(timeStamp);
       }
diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
index fb7df5c..5700b40 100644
--- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
+++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
@@ -73,7 +73,8 @@ public class TestExcelFormat extends ClusterTest {
 
     testBuilder()
       .sqlQuery(sql)
-      .ordered().baselineColumns("id", "first_name", "last_name", "email", "gender", "birthdate", "balance", "order_count", "average_order")
+      .unOrdered()
+      .baselineColumns("id", "first_name", "last_name", "email", "gender", "birthdate", "balance", "order_count", "average_order")
       .baselineValues(1.0, "Cornelia", "Matej", "cmatej0@mtv.com", "Female", "10/31/1974", 735.29, 22.0, 33.42227272727273)
       .baselineValues(2.0, "Nydia", "Heintsch", "nheintsch1@godaddy.com", "Female", "12/10/1966", 784.14, 22.0, 35.64272727272727)
       .baselineValues(3.0, "Waiter", "Sherel", "wsherel2@utexas.edu", "Male", "3/12/1961", 172.36, 17.0, 10.138823529411766)
@@ -320,7 +321,7 @@ public class TestExcelFormat extends ClusterTest {
 
     testBuilder()
       .sqlQuery(sql)
-      .ordered()
+      .unOrdered()
       .baselineColumns("col1", "col2", "col3")
       .baselineValues(1.0,2.0,null)
       .baselineValues(2.0,4.0,null)
@@ -339,11 +340,12 @@ public class TestExcelFormat extends ClusterTest {
 
     testBuilder()
       .sqlQuery(sql)
-      .ordered().baselineColumns("col1", "col2")
-      .baselineValues("1.0", "Bob")
-      .baselineValues("2.0", "Steve")
-      .baselineValues("3.0", "Anne")
-      .baselineValues("Bob", "3.0")
+      .unOrdered()
+      .baselineColumns("col1", "col2")
+      .baselineValues("1", "Bob")
+      .baselineValues("2", "Steve")
+      .baselineValues("3", "Anne")
+      .baselineValues("Bob", "3")
       .go();
   }
 


[drill] 03/04: DRILL-7672: Make metadata type required when reading from / writing into Drill Metastore

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 9173b0f8430237308f1134005defe1b5df7721f0
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Wed Mar 25 16:42:53 2020 +0200

    DRILL-7672: Make metadata type required when reading from / writing into Drill Metastore
    
    1. Upgraded Iceberg version and removed unneeded code for In / NotIn Expressions.
    2. Updated Metastore Read / Modify interfaces to support required metadata types:
     a. introduced abstract Read / Modify classes with boilerplate code;
     b. added delete operation with filter and metadata type;
     c. added metadata type validator which checks supported metadata types for each component;
     d. made purge operation terminal;
     e. made necessary changes in REAME.md files.
    3. Added / updated unit tests.
    
    closes #2042
---
 .../analyze/FileMetadataInfoCollector.java         |   5 +-
 .../impl/metadata/MetadataControllerBatch.java     |   9 +-
 .../MetastoreDropTableMetadataHandler.java         |   9 +-
 .../drill/exec/store/ischema/RecordCollector.java  |  31 ++--
 .../drill/exec/sql/TestMetastoreCommands.java      |  18 +--
 metastore/iceberg-metastore/pom.xml                |   8 +-
 .../iceberg/components/tables/IcebergTables.java   |  19 +--
 .../iceberg/components/tables/TableKey.java        |  11 +-
 .../tables/TablesOperationTransformer.java         |   5 +-
 .../metastore/iceberg/operate/IcebergModify.java   |  49 +++---
 .../metastore/iceberg/operate/IcebergRead.java     |  43 +++--
 .../iceberg/schema/IcebergTableSchema.java         |  16 +-
 .../iceberg/transform/FilterExpressionVisitor.java |  29 ++--
 .../iceberg/transform/FilterTransformer.java       |  36 ++++-
 .../iceberg/transform/InputDataTransformer.java    |   1 -
 .../iceberg/transform/OperationTransformer.java    |  14 +-
 .../components/tables/TestBasicRequests.java       |  11 +-
 .../tables/TestIcebergTablesMetastore.java         |  44 ++++--
 .../iceberg/components/tables/TestTableKey.java    |   9 +-
 .../tables/TestTablesOperationTransformer.java     |  25 ++-
 .../iceberg/schema/TestIcebergTableSchema.java     | 141 +++++++++--------
 .../iceberg/transform/TestFilterTransformer.java   | 175 +++++++++++++++------
 metastore/metastore-api/README.md                  |  71 ++++++---
 .../apache/drill/metastore/MetastoreColumn.java    |  77 +++++++++
 .../drill/metastore/MetastoreFieldDefinition.java  |   7 +
 .../components/tables/BasicTablesRequests.java     | 130 +++++++--------
 .../components/tables/TableMetadataUnit.java       | 123 ++++++++-------
 .../tables/TablesMetadataTypeValidator.java}       |  39 ++---
 .../metastore/expressions/FilterExpression.java    |  50 +++---
 .../drill/metastore/expressions/IsPredicate.java   |  22 +--
 .../drill/metastore/expressions/ListPredicate.java |  24 +--
 .../metastore/expressions/SimplePredicate.java     |  40 ++---
 .../drill/metastore/metadata/MetadataInfo.java     |   3 -
 .../apache/drill/metastore/metadata/TableInfo.java |  11 +-
 .../drill/metastore/operate/AbstractModify.java    |  54 +++++++
 .../drill/metastore/operate/AbstractRead.java      |  71 +++++++++
 .../org/apache/drill/metastore/operate/Delete.java |  88 +++++++++++
 .../metastore/operate/MetadataTypeValidator.java   |  60 +++++++
 .../org/apache/drill/metastore/operate/Modify.java |  31 ++--
 .../org/apache/drill/metastore/operate/Read.java   |  22 ++-
 .../components/tables/TestBasicTablesRequests.java |  41 +++--
 .../tables/TestTablesMetadataTypeValidator.java    |  76 +++++++++
 42 files changed, 1218 insertions(+), 530 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
index cc156f0..d720b31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.BasicTablesRequests;
 import org.apache.drill.metastore.metadata.MetadataInfo;
 import org.apache.drill.metastore.metadata.MetadataType;
@@ -289,8 +290,8 @@ public class FileMetadataInfoCollector implements MetadataInfoCollector {
         .tableInfo(tableInfo)
         .metadataKeys(metadataKeys)
         .paths(allFiles)
-        .metadataType(MetadataType.ROW_GROUP.name())
-        .requestColumns(Arrays.asList(MetadataInfo.METADATA_KEY, MetadataInfo.METADATA_IDENTIFIER, MetadataInfo.METADATA_TYPE))
+        .metadataType(MetadataType.ROW_GROUP)
+        .requestColumns(Arrays.asList(MetastoreColumn.METADATA_KEY, MetastoreColumn.METADATA_IDENTIFIER, MetastoreColumn.METADATA_TYPE))
         .build();
 
     return basicRequests.request(requestMetadata).stream()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 8ee3beb..9c0e8af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -64,6 +64,7 @@ import org.apache.drill.exec.vector.accessor.ObjectReader;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.TupleReader;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
 import org.apache.drill.metastore.components.tables.Tables;
@@ -77,6 +78,7 @@ import org.apache.drill.metastore.metadata.PartitionMetadata;
 import org.apache.drill.metastore.metadata.RowGroupMetadata;
 import org.apache.drill.metastore.metadata.SegmentMetadata;
 import org.apache.drill.metastore.metadata.TableInfo;
+import org.apache.drill.metastore.operate.Delete;
 import org.apache.drill.metastore.operate.Modify;
 import org.apache.drill.metastore.statistics.BaseStatisticsKind;
 import org.apache.drill.metastore.statistics.ColumnStatistics;
@@ -233,12 +235,15 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
 
     for (MetadataInfo metadataInfo : popConfig.getContext().metadataToRemove()) {
       deleteFilter = FilterExpression.and(deleteFilter,
-          FilterExpression.equal(MetadataInfo.METADATA_KEY, metadataInfo.key()));
+          FilterExpression.equal(MetastoreColumn.METADATA_KEY, metadataInfo.key()));
     }
 
     Modify<TableMetadataUnit> modify = tables.modify();
     if (!popConfig.getContext().metadataToRemove().isEmpty()) {
-      modify.delete(deleteFilter);
+      modify.delete(Delete.builder()
+        .metadataType(MetadataType.SEGMENT, MetadataType.FILE, MetadataType.ROW_GROUP, MetadataType.PARTITION)
+        .filter(deleteFilter)
+        .build());
     }
 
     MetastoreTableInfo metastoreTableInfo = popConfig.getContext().metastoreTableInfo();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreDropTableMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreDropTableMetadataHandler.java
index 3f1dbea..77b682f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreDropTableMetadataHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreDropTableMetadataHandler.java
@@ -30,7 +30,9 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
 import org.apache.drill.metastore.components.tables.Tables;
 import org.apache.drill.metastore.exceptions.MetastoreException;
+import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.drill.metastore.metadata.TableInfo;
+import org.apache.drill.metastore.operate.Delete;
 import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,8 +86,11 @@ public class MetastoreDropTableMetadataHandler extends DefaultSqlHandler {
       }
 
       tables.modify()
-          .delete(tableInfo.toFilter())
-          .execute();
+        .delete(Delete.builder()
+          .metadataType(MetadataType.ALL)
+          .filter(tableInfo.toFilter())
+          .build())
+        .execute();
     } catch (MetastoreException e) {
       logger.error("Error when dropping metadata for table {}", dropTableMetadata.getName(), e);
       return DirectPlan.createDirectPlan(context, false, e.getMessage());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
index da4f474..107a544 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
@@ -33,14 +33,15 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
 import org.apache.drill.exec.util.FileSystemUtil;
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.Metastore;
+import org.apache.drill.metastore.components.tables.BasicTablesRequests;
 import org.apache.drill.metastore.components.tables.BasicTablesTransformer;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
 import org.apache.drill.metastore.expressions.FilterExpression;
 import org.apache.drill.metastore.metadata.BaseTableMetadata;
 import org.apache.drill.metastore.metadata.MetadataInfo;
 import org.apache.drill.metastore.metadata.MetadataType;
-import org.apache.drill.metastore.metadata.TableInfo;
 import org.apache.drill.metastore.statistics.ColumnStatistics;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -233,7 +234,6 @@ public interface RecordCollector {
     private static final Logger logger = getLogger(MetastoreRecordCollector.class);
 
     public static final int UNDEFINED_INDEX = -1;
-    public static final String SCHEMA = "schema";
 
     private final Metastore metastore;
     private final FilterEvaluator filterEvaluator;
@@ -262,8 +262,8 @@ public interface RecordCollector {
         try {
           baseTableMetadata = metastore.tables().basicRequests()
             .tablesMetadata(FilterExpression.and(
-              FilterExpression.equal(TableInfo.STORAGE_PLUGIN, drillSchema.getSchemaPath().get(0)),
-              FilterExpression.equal(TableInfo.WORKSPACE, drillSchema.getSchemaPath().get(1))));
+              FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, drillSchema.getSchemaPath().get(0)),
+              FilterExpression.equal(MetastoreColumn.WORKSPACE, drillSchema.getSchemaPath().get(1))));
         } catch (Exception e) {
           // ignore all exceptions related to Metastore data retrieval, return empty result
           logger.warn("Error while retrieving Metastore table data: {}", e.getMessage());
@@ -293,10 +293,10 @@ public interface RecordCollector {
         try {
           baseTableMetadata = metastore.tables().basicRequests()
             .tablesMetadata(FilterExpression.and(
-              FilterExpression.equal(TableInfo.STORAGE_PLUGIN, drillSchema.getSchemaPath().get(0)),
-              FilterExpression.equal(TableInfo.WORKSPACE, drillSchema.getSchemaPath().get(1)),
+              FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, drillSchema.getSchemaPath().get(0)),
+              FilterExpression.equal(MetastoreColumn.WORKSPACE, drillSchema.getSchemaPath().get(1)),
               // exclude tables without schema
-              FilterExpression.isNotNull(SCHEMA)));
+              FilterExpression.isNotNull(MetastoreColumn.SCHEMA)));
         } catch (Exception e) {
           // ignore all exceptions related to Metastore data retrieval, return empty result
           logger.warn("Error while retrieving Metastore table data: {}", e.getMessage());
@@ -366,15 +366,16 @@ public interface RecordCollector {
 
         BasicTablesTransformer.MetadataHolder metadataHolder;
         try {
-          List<TableMetadataUnit> units = metastore.tables().read()
-            .filter(FilterExpression.and(
-              FilterExpression.equal(TableInfo.STORAGE_PLUGIN, drillSchema.getSchemaPath().get(0)),
-              FilterExpression.equal(TableInfo.WORKSPACE, drillSchema.getSchemaPath().get(1)),
-              // include SEGMENT and PARTITION data only
-              FilterExpression.in(MetadataInfo.METADATA_TYPE, MetadataType.SEGMENT.name(), MetadataType.PARTITION.name()),
+          BasicTablesRequests.RequestMetadata requestMetadata = BasicTablesRequests.RequestMetadata.builder()
+            .metadataTypes(MetadataType.SEGMENT, MetadataType.PARTITION)
+            .customFilter(FilterExpression.and(
+              FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, drillSchema.getSchemaPath().get(0)),
+              FilterExpression.equal(MetastoreColumn.WORKSPACE, drillSchema.getSchemaPath().get(1)),
               // exclude DEFAULT_SEGMENT (used only for non-partitioned tables)
-              FilterExpression.notEqual(MetadataInfo.METADATA_KEY, MetadataInfo.DEFAULT_SEGMENT_KEY)))
-            .execute();
+              FilterExpression.notEqual(MetastoreColumn.METADATA_KEY, MetadataInfo.DEFAULT_SEGMENT_KEY)))
+            .build();
+
+          List<TableMetadataUnit> units = metastore.tables().basicRequests().request(requestMetadata);
 
           metadataHolder = BasicTablesTransformer.all(units);
         } catch (Exception e) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index 92472cf..b25e2b2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -38,6 +38,7 @@ import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.drill.metastore.metadata.RowGroupMetadata;
 import org.apache.drill.metastore.metadata.SegmentMetadata;
 import org.apache.drill.metastore.metadata.TableInfo;
+import org.apache.drill.metastore.operate.Delete;
 import org.apache.drill.metastore.statistics.BaseStatisticsKind;
 import org.apache.drill.metastore.statistics.ColumnStatistics;
 import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
@@ -257,7 +258,10 @@ public class TestMetastoreCommands extends ClusterTest {
           .get()
           .tables()
           .modify()
-          .delete(tableInfo.toFilter())
+          .delete(Delete.builder()
+            .metadataType(MetadataType.ALL)
+            .filter(tableInfo.toFilter())
+            .build())
           .execute();
       run("drop table if exists dfs.tmp.`%s`", tableName);
       client.resetSession(ExecConstants.METASTORE_ENABLED);
@@ -552,12 +556,11 @@ public class TestMetastoreCommands extends ClusterTest {
             .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
             .go();
 
-        List<String> emptyMetadataLevels = Arrays.stream(MetadataType.values())
+        List<MetadataType> emptyMetadataLevels = Arrays.stream(MetadataType.values())
             .filter(metadataType -> metadataType.compareTo(analyzeLevel) > 0
                 // for the case when there are no segment metadata, default segment is present
                 && metadataType.compareTo(MetadataType.SEGMENT) > 0
                 && metadataType.compareTo(MetadataType.ALL) < 0)
-            .map(Enum::name)
             .collect(Collectors.toList());
 
         BasicTablesRequests.RequestMetadata requestMetadata = BasicTablesRequests.RequestMetadata.builder()
@@ -566,12 +569,9 @@ public class TestMetastoreCommands extends ClusterTest {
             .build();
 
         List<TableMetadataUnit> metadataUnitList = cluster.drillbit().getContext()
-            .getMetastoreRegistry()
-            .get()
-            .tables()
-            .read()
-            .filter(requestMetadata.filter())
-            .execute();
+          .getMetastoreRegistry().get().tables()
+          .basicRequests()
+          .request(requestMetadata);
 
         assertTrue(
             String.format("Some metadata [%s] for [%s] analyze query level is present" + metadataUnitList, emptyMetadataLevels, analyzeLevel),
diff --git a/metastore/iceberg-metastore/pom.xml b/metastore/iceberg-metastore/pom.xml
index aa68271..173f681 100644
--- a/metastore/iceberg-metastore/pom.xml
+++ b/metastore/iceberg-metastore/pom.xml
@@ -31,7 +31,7 @@
   <name>metastore/Drill Iceberg Metastore</name>
 
   <properties>
-    <iceberg.version>2d75130</iceberg.version>
+    <iceberg.version>93d51b9</iceberg.version>
     <caffeine.version>2.7.0</caffeine.version>
   </properties>
 
@@ -139,6 +139,12 @@
       <version>${asm.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.ow2.asm</groupId>
+      <artifactId>asm-tree</artifactId>
+      <version>${asm.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
index a28248d..860a14e 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
@@ -17,8 +17,11 @@
  */
 package org.apache.drill.metastore.iceberg.components.tables;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.components.tables.TablesMetadataTypeValidator;
 import org.apache.drill.metastore.iceberg.operate.ExpirationHandler;
+import org.apache.drill.metastore.iceberg.operate.IcebergRead;
 import org.apache.drill.metastore.operate.Metadata;
 import org.apache.drill.metastore.operate.Modify;
 import org.apache.drill.metastore.operate.Read;
@@ -27,7 +30,6 @@ import org.apache.drill.metastore.iceberg.MetastoreContext;
 import org.apache.drill.metastore.iceberg.operate.IcebergMetadata;
 import org.apache.drill.metastore.iceberg.schema.IcebergTableSchema;
 import org.apache.drill.metastore.iceberg.operate.IcebergModify;
-import org.apache.drill.metastore.iceberg.operate.IcebergRead;
 import org.apache.drill.metastore.iceberg.transform.Transformer;
 import org.apache.drill.metastore.iceberg.write.FileWriter;
 import org.apache.drill.metastore.iceberg.write.ParquetFileWriter;
@@ -42,16 +44,15 @@ import java.util.List;
  */
 public class IcebergTables implements Tables, MetastoreContext<TableMetadataUnit> {
 
-  public static final String STORAGE_PLUGIN = "storagePlugin";
-  public static final String WORKSPACE = "workspace";
-  public static final String TABLE_NAME = "tableName";
-  public static final String METADATA_KEY = "metadataKey";
-
   /**
    * Metastore Tables component partition keys, order of partitioning will be determined based
    * on order in {@link List} holder.
    */
-  private static final List<String> PARTITION_KEYS = Arrays.asList(STORAGE_PLUGIN, WORKSPACE, TABLE_NAME, METADATA_KEY);
+  private static final List<MetastoreColumn> PARTITION_KEYS = Arrays.asList(
+    MetastoreColumn.STORAGE_PLUGIN,
+    MetastoreColumn.WORKSPACE,
+    MetastoreColumn.TABLE_NAME,
+    MetastoreColumn.METADATA_KEY);
 
   public static IcebergTableSchema SCHEMA = IcebergTableSchema.of(TableMetadataUnit.class, PARTITION_KEYS);
 
@@ -74,12 +75,12 @@ public class IcebergTables implements Tables, MetastoreContext<TableMetadataUnit
 
   @Override
   public Read<TableMetadataUnit> read() {
-    return new IcebergRead<>(context());
+    return new IcebergRead<>(TablesMetadataTypeValidator.INSTANCE, context());
   }
 
   @Override
   public Modify<TableMetadataUnit> modify() {
-    return new IcebergModify<>(context());
+    return new IcebergModify<>(TablesMetadataTypeValidator.INSTANCE, context());
   }
 
   @Override
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TableKey.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TableKey.java
index 452c0b8..5802d18 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TableKey.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TableKey.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.iceberg.components.tables;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
 import org.apache.hadoop.fs.Path;
 
@@ -77,11 +78,11 @@ public class TableKey {
    *
    * @return map of with condition references anf values
    */
-  public Map<String, Object> toFilterConditions() {
-    Map<String, Object> conditions = new HashMap<>();
-    conditions.put(IcebergTables.STORAGE_PLUGIN, storagePlugin);
-    conditions.put(IcebergTables.WORKSPACE, workspace);
-    conditions.put(IcebergTables.TABLE_NAME, tableName);
+  public Map<MetastoreColumn, Object> toFilterConditions() {
+    Map<MetastoreColumn, Object> conditions = new HashMap<>();
+    conditions.put(MetastoreColumn.STORAGE_PLUGIN, storagePlugin);
+    conditions.put(MetastoreColumn.WORKSPACE, workspace);
+    conditions.put(MetastoreColumn.TABLE_NAME, tableName);
     return conditions;
   }
 
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesOperationTransformer.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesOperationTransformer.java
index 67d1bc4..e9fca18 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesOperationTransformer.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesOperationTransformer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.iceberg.components.tables;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
 import org.apache.drill.metastore.iceberg.MetastoreContext;
 import org.apache.drill.metastore.iceberg.operate.Overwrite;
@@ -61,8 +62,8 @@ public class TablesOperationTransformer extends OperationTransformer<TableMetada
 
           String location = tableKey.toLocation(context.table().location());
 
-          Map<String, Object> filterConditions = new HashMap<>(tableKey.toFilterConditions());
-          filterConditions.put(IcebergTables.METADATA_KEY, operationEntry.getKey());
+          Map<MetastoreColumn, Object> filterConditions = new HashMap<>(tableKey.toFilterConditions());
+          filterConditions.put(MetastoreColumn.METADATA_KEY, operationEntry.getKey());
           Expression expression = context.transformer().filter().transform(filterConditions);
 
           return toOverwrite(location, expression, operationEntry.getValue());
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
index e29571f..a06f749 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
@@ -17,65 +17,54 @@
  */
 package org.apache.drill.metastore.iceberg.operate;
 
-import org.apache.drill.metastore.operate.Modify;
 import org.apache.drill.metastore.expressions.FilterExpression;
 import org.apache.drill.metastore.iceberg.MetastoreContext;
 import org.apache.drill.metastore.iceberg.transform.OperationTransformer;
+import org.apache.drill.metastore.operate.AbstractModify;
+import org.apache.drill.metastore.operate.MetadataTypeValidator;
+import org.apache.drill.metastore.operate.Modify;
 import org.apache.iceberg.Transaction;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
- * Implementation of {@link Modify} interface.
+ * Implementation of {@link Modify} interface based on {@link AbstractModify} parent class.
  * Modifies information in Iceberg table based on given overwrite or delete operations.
  * Executes given operations in one transaction.
  *
  * @param <T> Metastore component unit type
  */
-public class IcebergModify<T> implements Modify<T> {
+public class IcebergModify<T> extends AbstractModify<T> {
 
   private final MetastoreContext<T> context;
-  private final List<T> overwriteUnits = new ArrayList<>();
-  private final List<FilterExpression> deleteFilters = new ArrayList<>();
-  private boolean purge = false;
 
-  public IcebergModify(MetastoreContext<T> context) {
+  public IcebergModify(MetadataTypeValidator metadataTypeValidator, MetastoreContext<T> context) {
+    super(metadataTypeValidator);
     this.context = context;
   }
 
   @Override
-  public Modify<T> overwrite(List<T> units) {
-    overwriteUnits.addAll(units);
-    return this;
-  }
-
-  @Override
-  public Modify<T> delete(FilterExpression filter) {
-    deleteFilters.add(filter);
-    return this;
-  }
-
-  @Override
-  public Modify<T> purge() {
-    purge = true;
-    return this;
-  }
-
-  @Override
   public void execute() {
     OperationTransformer<T> transformer = context.transformer().operation();
     List<IcebergOperation> operations = new ArrayList<>(transformer.toOverwrite(overwriteUnits));
-    operations.addAll(transformer.toDelete(deleteFilters));
-
-    if (purge) {
-      operations.add(transformer.toDelete((FilterExpression) null));
-    }
+    operations.addAll(transformer.toDelete(deletes));
 
     if (operations.isEmpty()) {
       return;
     }
 
+    executeOperations(operations);
+  }
+
+  @Override
+  public void purge() {
+    executeOperations(Collections.singletonList(
+      context.transformer().operation().toDelete((FilterExpression) null)));
+  }
+
+  private void executeOperations(List<IcebergOperation> operations) {
     Transaction transaction = context.table().newTransaction();
     operations.forEach(op -> op.add(transaction));
     transaction.commitTransaction();
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergRead.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergRead.java
index 520a993..0610ec9 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergRead.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergRead.java
@@ -17,32 +17,32 @@
  */
 package org.apache.drill.metastore.iceberg.operate;
 
-import org.apache.drill.metastore.operate.Read;
-import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.iceberg.MetastoreContext;
+import org.apache.drill.metastore.iceberg.transform.FilterTransformer;
+import org.apache.drill.metastore.operate.AbstractRead;
+import org.apache.drill.metastore.operate.MetadataTypeValidator;
+import org.apache.drill.metastore.operate.Read;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.types.Types;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Implementation of {@link Read} interface.
+ * Implementation of {@link Read} interface based on {@link AbstractRead} parent class.
  * Reads information from Iceberg table based on given filter expression.
  * Supports reading information for specific columns.
- *
- * @param <T> Metastore component unit type
  */
-public class IcebergRead<T> implements Read<T> {
+public class IcebergRead<T> extends AbstractRead<T> {
 
   private final MetastoreContext<T> context;
   private final String[] defaultColumns;
-  private final List<String> columns = new ArrayList<>();
-  private FilterExpression filter;
 
-  public IcebergRead(MetastoreContext<T> context) {
+  public IcebergRead(MetadataTypeValidator metadataTypeValidator, MetastoreContext<T> context) {
+    super(metadataTypeValidator);
     this.context = context;
     this.defaultColumns = context.table().schema().columns().stream()
       .map(Types.NestedField::name)
@@ -50,23 +50,20 @@ public class IcebergRead<T> implements Read<T> {
   }
 
   @Override
-  public Read<T> filter(FilterExpression filter) {
-    this.filter = filter;
-    return this;
-  }
+  protected List<T> internalExecute() {
+    String[] selectedColumns = columns.isEmpty()
+      ? defaultColumns
+      : columns.stream()
+         .map(MetastoreColumn::columnName)
+         .toArray(String[]::new);
 
-  @Override
-  public Read<T> columns(List<String> columns) {
-    this.columns.addAll(columns);
-    return this;
-  }
+    FilterTransformer filterTransformer = context.transformer().filter();
+    Expression rowFilter = filterTransformer.combine(
+      filterTransformer.transform(metadataTypes), filterTransformer.transform(filter));
 
-  @Override
-  public List<T> execute() {
-    String[] selectedColumns = columns.isEmpty() ? defaultColumns : columns.toArray(new String[0]);
     Iterable<Record> records = IcebergGenerics.read(context.table())
       .select(selectedColumns)
-      .where(context.transformer().filter().transform(filter))
+      .where(rowFilter)
       .build();
 
     return context.transformer().outputData()
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/schema/IcebergTableSchema.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/schema/IcebergTableSchema.java
index 832998a..11ebd85 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/schema/IcebergTableSchema.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/schema/IcebergTableSchema.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.iceberg.schema;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.MetastoreFieldDefinition;
 import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
@@ -73,7 +74,7 @@ public class IcebergTableSchema {
    * @param partitionKeys list of partition keys
    * @return instance of Iceberg table schema
    */
-  public static IcebergTableSchema of(Class<?> clazz, List<String> partitionKeys) {
+  public static IcebergTableSchema of(Class<?> clazz, List<MetastoreColumn> partitionKeys) {
     List<Types.NestedField> tableSchemaFields = new ArrayList<>();
     Types.NestedField[] partitionSpecSchemaFields = new Types.NestedField[partitionKeys.size()];
 
@@ -81,10 +82,13 @@ public class IcebergTableSchema {
     int complexTypesIndex = STARTING_COMPLEX_TYPES_INDEX;
 
     for (Field field : clazz.getDeclaredFields()) {
-      if (!field.isAnnotationPresent(MetastoreFieldDefinition.class)) {
+      MetastoreFieldDefinition definition = field.getAnnotation(MetastoreFieldDefinition.class);
+      if (definition == null) {
         continue;
       }
 
+      MetastoreColumn column = definition.column();
+
       String typeSimpleName = field.getType().getSimpleName().toLowerCase();
       org.apache.iceberg.types.Type icebergType = JAVA_TO_ICEBERG_TYPE_MAP.get(typeSimpleName);
 
@@ -111,11 +115,11 @@ public class IcebergTableSchema {
           "Unexpected type for class [%s]: %s", clazz.getCanonicalName(), typeSimpleName));
       }
 
-      Types.NestedField icebergField = Types.NestedField.optional(schemaIndex++, field.getName(), icebergType);
+      Types.NestedField icebergField = Types.NestedField.optional(schemaIndex++, column.columnName(), icebergType);
 
       tableSchemaFields.add(icebergField);
 
-      int partitionIndex = partitionKeys.indexOf(field.getName());
+      int partitionIndex = partitionKeys.indexOf(column);
       if (partitionIndex != -1) {
         partitionSpecSchemaFields[partitionIndex] = icebergField;
       }
@@ -161,8 +165,8 @@ public class IcebergTableSchema {
       return PartitionSpec.unpartitioned();
     }
     PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
-    schema.columns().
-      forEach(column -> builder.identity(column.name()));
+    schema.columns()
+      .forEach(column -> builder.identity(column.name()));
     return builder.build();
   }
 }
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterExpressionVisitor.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterExpressionVisitor.java
index b582b05..565a4e7 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterExpressionVisitor.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterExpressionVisitor.java
@@ -26,8 +26,6 @@ import org.apache.drill.metastore.expressions.SingleExpressionPredicate;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 
-import java.util.List;
-
 /**
  * Visits {@link FilterExpression} implementations and transforms them into Iceberg {@link Expression}.
  */
@@ -41,53 +39,52 @@ public class FilterExpressionVisitor implements FilterExpression.Visitor<Express
 
   @Override
   public Expression visit(SimplePredicate.Equal<?> expression) {
-    return Expressions.equal(expression.reference(), expression.value());
+    return Expressions.equal(expression.column().columnName(), expression.value());
   }
 
   @Override
   public Expression visit(SimplePredicate.NotEqual<?> expression) {
-    return Expressions.notEqual(expression.reference(), expression.value());
+    return Expressions.notEqual(expression.column().columnName(), expression.value());
   }
 
   @Override
   public Expression visit(SimplePredicate.LessThan<?> expression) {
-    return Expressions.lessThan(expression.reference(), expression.value());
+    return Expressions.lessThan(expression.column().columnName(), expression.value());
   }
 
   @Override
   public Expression visit(SimplePredicate.LessThanOrEqual<?> expression) {
-    return Expressions.lessThanOrEqual(expression.reference(), expression.value());
+    return Expressions.lessThanOrEqual(expression.column().columnName(), expression.value());
   }
 
   @Override
   public Expression visit(SimplePredicate.GreaterThan<?> expression) {
-    return Expressions.greaterThan(expression.reference(), expression.value());
+    return Expressions.greaterThan(expression.column().columnName(), expression.value());
   }
 
   @Override
   public Expression visit(SimplePredicate.GreaterThanOrEqual<?> expression) {
-    return Expressions.greaterThanOrEqual(expression.reference(), expression.value());
+    return Expressions.greaterThanOrEqual(expression.column().columnName(), expression.value());
   }
 
   @Override
   public Expression visit(ListPredicate.In<?> expression) {
-    return toInExpression(expression.reference(), expression.values());
+    return Expressions.in(expression.column().columnName(), expression.values());
   }
 
   @Override
   public Expression visit(ListPredicate.NotIn<?> expression) {
-    Expression in = toInExpression(expression.reference(), expression.values());
-    return Expressions.not(in);
+    return Expressions.notIn(expression.column().columnName(), expression.values());
   }
 
   @Override
   public Expression visit(IsPredicate.IsNull expression) {
-    return Expressions.isNull(expression.reference());
+    return Expressions.isNull(expression.column().columnName());
   }
 
   @Override
   public Expression visit(IsPredicate.IsNotNull expression) {
-    return Expressions.notNull(expression.reference());
+    return Expressions.notNull(expression.column().columnName());
   }
 
   @Override
@@ -109,10 +106,4 @@ public class FilterExpressionVisitor implements FilterExpression.Visitor<Express
     Expression left = expression.left().accept(this);
     return Expressions.or(right, left);
   }
-
-  private Expression toInExpression(String reference, List<?> values) {
-    return values.stream()
-      .map(value -> (Expression) Expressions.equal(reference, value))
-      .reduce(Expressions.alwaysFalse(), Expressions::or);
-  }
 }
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterTransformer.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterTransformer.java
index 5ee753c..03fa171 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterTransformer.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterTransformer.java
@@ -17,10 +17,13 @@
  */
 package org.apache.drill.metastore.iceberg.transform;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -37,13 +40,13 @@ public class FilterTransformer {
     return filter == null ? Expressions.alwaysTrue() : filter.accept(visitor);
   }
 
-  public Expression transform(Map<String, Object> conditions) {
+  public Expression transform(Map<MetastoreColumn, Object> conditions) {
     if (conditions == null || conditions.isEmpty()) {
       return Expressions.alwaysTrue();
     }
 
     List<Expression> expressions = conditions.entrySet().stream()
-      .map(entry -> Expressions.equal(entry.getKey(), entry.getValue()))
+      .map(entry -> Expressions.equal(entry.getKey().columnName(), entry.getValue()))
       .collect(Collectors.toList());
 
     if (expressions.size() == 1) {
@@ -53,4 +56,33 @@ public class FilterTransformer {
     return Expressions.and(expressions.get(0), expressions.get(1),
       expressions.subList(2, expressions.size()).toArray(new Expression[0]));
   }
+
+  public Expression transform(List<MetadataType> metadataTypes) {
+    if (metadataTypes.contains(MetadataType.ALL)) {
+      return Expressions.alwaysTrue();
+    }
+
+    List<String> inConditionValues = metadataTypes.stream()
+      .map(Enum::name)
+      .collect(Collectors.toList());
+
+    if (inConditionValues.size() == 1) {
+      return Expressions.equal(MetastoreColumn.METADATA_TYPE.columnName(), inConditionValues.get(0));
+    }
+
+    return Expressions.in(MetastoreColumn.METADATA_TYPE.columnName(), inConditionValues);
+  }
+
+  public Expression combine(Expression... expressions) {
+    if (expressions.length == 0) {
+      return Expressions.alwaysTrue();
+    }
+
+    if (expressions.length == 1) {
+      return expressions[0];
+    }
+
+    return Expressions.and(expressions[0], expressions[1],
+      Arrays.copyOfRange(expressions, 2, expressions.length));
+  }
 }
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/InputDataTransformer.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/InputDataTransformer.java
index ca0350e..2fef850 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/InputDataTransformer.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/InputDataTransformer.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.metastore.iceberg.transform;
 
-
 import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericRecord;
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/OperationTransformer.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/OperationTransformer.java
index c410c76..ab27771 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/OperationTransformer.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/OperationTransformer.java
@@ -64,16 +64,18 @@ public abstract class OperationTransformer<T> {
     return new Overwrite(dataFile, expression);
   }
 
-  public List<Delete> toDelete(List<FilterExpression> filters) {
-    return filters.stream()
-      .map(this::toDelete)
-      .collect(Collectors.toList());
-  }
-
   public Delete toDelete(FilterExpression filter) {
     return new Delete(context.transformer().filter().transform(filter));
   }
 
+  public List<Delete> toDelete(List<org.apache.drill.metastore.operate.Delete> deletes) {
+    FilterTransformer filterTransformer = context.transformer().filter();
+    return deletes.stream()
+      // metadata types are ignored during delete since they are not part of the partition key
+      .map(delete -> new Delete(filterTransformer.transform(delete.filter())))
+      .collect(Collectors.toList());
+  }
+
   /**
    * Converts given list of Metastore components units into list of overwrite operations.
    * Specific for each Metastore component.
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestBasicRequests.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestBasicRequests.java
index 183c74a..4947113 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestBasicRequests.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestBasicRequests.java
@@ -18,6 +18,7 @@
 package org.apache.drill.metastore.iceberg.components.tables;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.BasicTablesRequests;
 import org.apache.drill.metastore.components.tables.BasicTablesTransformer;
 import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
@@ -114,8 +115,8 @@ public class TestBasicRequests extends IcebergBaseTest {
   public void testTablesMetadataAbsent() {
     List<BaseTableMetadata> tablesMetadata = basicRequests.tablesMetadata(
       FilterExpression.and(
-        FilterExpression.equal(IcebergTables.STORAGE_PLUGIN, "dfs"),
-        FilterExpression.equal(IcebergTables.WORKSPACE, "absent")));
+        FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"),
+        FilterExpression.equal(MetastoreColumn.WORKSPACE, "absent")));
     assertTrue(tablesMetadata.isEmpty());
   }
 
@@ -123,8 +124,8 @@ public class TestBasicRequests extends IcebergBaseTest {
   public void testTablesMetadataExisting() {
     List<BaseTableMetadata> baseTableMetadata = basicRequests.tablesMetadata(
       FilterExpression.and(
-        FilterExpression.equal(IcebergTables.STORAGE_PLUGIN, "dfs"),
-        FilterExpression.equal(IcebergTables.WORKSPACE, "tmp")));
+        FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"),
+        FilterExpression.equal(MetastoreColumn.WORKSPACE, "tmp")));
     assertTrue(baseTableMetadata.size() > 1);
   }
 
@@ -455,7 +456,7 @@ public class TestBasicRequests extends IcebergBaseTest {
   public void testCustomRequest() {
     BasicTablesRequests.RequestMetadata requestMetadata = BasicTablesRequests.RequestMetadata.builder()
       .column("n_nation")
-      .metadataType(MetadataType.PARTITION.name())
+      .metadataType(MetadataType.PARTITION)
       .build();
 
     List<TableMetadataUnit> units = basicRequests.request(requestMetadata);
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastore.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastore.java
index cf69805..cd01386 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastore.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastore.java
@@ -19,7 +19,10 @@ package org.apache.drill.metastore.iceberg.components.tables;
 
 import com.typesafe.config.ConfigValueFactory;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.operate.Delete;
 import org.apache.drill.metastore.operate.Metadata;
 import org.apache.drill.metastore.Metastore;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
@@ -230,6 +233,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .execute();
 
     List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.ALL)
       .filter(tableInfo.toFilter())
       .execute();
 
@@ -260,8 +264,9 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .execute();
 
     List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.ALL)
       .filter(tableInfo.toFilter())
-      .columns("tableName", "metadataKey")
+      .columns(MetastoreColumn.TABLE_NAME, MetastoreColumn.METADATA_KEY)
       .execute();
 
     assertEquals(1, units.size());
@@ -274,8 +279,9 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
     Tables tables = new IcebergMetastore(config).tables();
 
     List<TableMetadataUnit> units = tables.read()
-      .filter(FilterExpression.equal("storagePlugin", "dfs"))
-      .columns("tableName", "metadataKey")
+      .metadataType(MetadataType.ALL)
+      .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"))
+      .columns(MetastoreColumn.TABLE_NAME, MetastoreColumn.METADATA_KEY)
       .execute();
 
     assertTrue(units.isEmpty());
@@ -297,6 +303,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .workspace(tableInfo.workspace())
       .tableName(tableInfo.name())
       .metadataKey("dir0")
+      .metadataType(MetadataType.TABLE.name())
       .tableType("parquet")
       .build();
 
@@ -305,6 +312,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .execute();
 
     List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.TABLE)
       .filter(tableInfo.toFilter())
       .execute();
 
@@ -316,6 +324,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .workspace("tmp")
       .tableName("nation")
       .metadataKey("dir0")
+      .metadataType(MetadataType.TABLE.name())
       .tableType("text")
       .build();
 
@@ -324,6 +333,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .execute();
 
     List<TableMetadataUnit> updatedUnits = tables.read()
+      .metadataType(MetadataType.TABLE)
       .filter(tableInfo.toFilter())
       .execute();
 
@@ -347,6 +357,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .workspace(tableInfo.workspace())
       .tableName(tableInfo.name())
       .metadataKey("dir0")
+      .metadataType(MetadataType.SEGMENT.name())
       .build();
 
     TableMetadataUnit secondUnit = TableMetadataUnit.builder()
@@ -354,6 +365,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .workspace(tableInfo.workspace())
       .tableName(tableInfo.name())
       .metadataKey("dir1")
+      .metadataType(MetadataType.SEGMENT.name())
       .build();
 
     tables.modify()
@@ -361,6 +373,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .execute();
 
     List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.SEGMENT)
       .filter(tableInfo.toFilter())
       .execute();
 
@@ -368,13 +381,17 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
 
     FilterExpression deleteFilter = FilterExpression.and(
       tableInfo.toFilter(),
-      FilterExpression.equal("metadataKey", "dir0"));
+      FilterExpression.equal(MetastoreColumn.METADATA_KEY, "dir0"));
 
     tables.modify()
-      .delete(deleteFilter)
+      .delete(Delete.builder()
+        .metadataType(MetadataType.SEGMENT)
+        .filter(deleteFilter)
+        .build())
       .execute();
 
     List<TableMetadataUnit> updatedUnits = tables.read()
+      .metadataType(MetadataType.SEGMENT)
       .filter(tableInfo.toFilter())
       .execute();
 
@@ -398,6 +415,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .workspace(tableInfo.workspace())
       .tableName(tableInfo.name())
       .metadataKey("dir0")
+      .metadataType(MetadataType.SEGMENT.name())
       .tableType("parquet")
       .build();
 
@@ -406,6 +424,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .workspace(tableInfo.workspace())
       .tableName(tableInfo.name())
       .metadataKey("dir1")
+      .metadataType(MetadataType.SEGMENT.name())
       .tableType("parquet")
       .build();
 
@@ -414,6 +433,7 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .execute();
 
     List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.SEGMENT)
       .filter(tableInfo.toFilter())
       .execute();
 
@@ -421,22 +441,27 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
 
     FilterExpression deleteFilter = FilterExpression.and(
       tableInfo.toFilter(),
-      FilterExpression.equal("metadataKey", "dir0"));
+      FilterExpression.equal(MetastoreColumn.METADATA_KEY, "dir0"));
 
     TableMetadataUnit updatedUnit = TableMetadataUnit.builder()
       .storagePlugin(tableInfo.storagePlugin())
       .workspace(tableInfo.workspace())
       .tableName(tableInfo.name())
       .metadataKey("dir1")
+      .metadataType(MetadataType.SEGMENT.name())
       .tableType("text")
       .build();
 
     tables.modify()
-      .delete(deleteFilter)
+      .delete(Delete.builder()
+        .metadataType(MetadataType.SEGMENT)
+        .filter(deleteFilter)
+        .build())
       .overwrite(updatedUnit)
       .execute();
 
     List<TableMetadataUnit> updatedUnits = tables.read()
+      .metadataType(MetadataType.SEGMENT)
       .filter(tableInfo.toFilter())
       .execute();
 
@@ -470,15 +495,16 @@ public class TestIcebergTablesMetastore extends IcebergBaseTest {
       .execute();
 
     List<TableMetadataUnit> initialUnits = tables.read()
+      .metadataType(MetadataType.ALL)
       .execute();
 
     assertEquals(2, initialUnits.size());
 
     tables.modify()
-      .purge()
-      .execute();
+      .purge();
 
     List<TableMetadataUnit> resultingUnits = tables.read()
+      .metadataType(MetadataType.ALL)
       .execute();
 
     assertTrue(resultingUnits.isEmpty());
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTableKey.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTableKey.java
index 3e96ca2..30d22e3 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTableKey.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTableKey.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.iceberg.components.tables;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
 import org.apache.drill.metastore.iceberg.IcebergBaseTest;
 import org.apache.hadoop.fs.Path;
@@ -58,10 +59,10 @@ public class TestTableKey extends IcebergBaseTest {
   public void testToFilterConditions() {
     TableKey tableKey = new TableKey("dfs", "tmp", "nation");
 
-    Map<String, Object> expected = new HashMap<>();
-    expected.put(IcebergTables.STORAGE_PLUGIN, "dfs");
-    expected.put(IcebergTables.WORKSPACE, "tmp");
-    expected.put(IcebergTables.TABLE_NAME, "nation");
+    Map<MetastoreColumn, Object> expected = new HashMap<>();
+    expected.put(MetastoreColumn.STORAGE_PLUGIN, "dfs");
+    expected.put(MetastoreColumn.WORKSPACE, "tmp");
+    expected.put(MetastoreColumn.TABLE_NAME, "nation");
 
     assertEquals(expected, tableKey.toFilterConditions());
   }
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
index 80c6f77..ac444fc 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
@@ -18,6 +18,7 @@
 package org.apache.drill.metastore.iceberg.components.tables;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
 import org.apache.drill.metastore.expressions.FilterExpression;
 import org.apache.drill.metastore.iceberg.IcebergBaseTest;
@@ -26,6 +27,7 @@ import org.apache.drill.metastore.iceberg.operate.Delete;
 import org.apache.drill.metastore.iceberg.operate.Overwrite;
 import org.apache.drill.metastore.iceberg.transform.FilterTransformer;
 import org.apache.drill.metastore.iceberg.transform.OperationTransformer;
+import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -61,8 +63,8 @@ public class TestTablesOperationTransformer extends IcebergBaseTest {
 
     TableKey tableKey = new TableKey(unit.storagePlugin(), unit.workspace(), unit.tableName());
 
-    Map<String, Object> filterConditions = new HashMap<>(tableKey.toFilterConditions());
-    filterConditions.put(IcebergTables.METADATA_KEY, unit.metadataKey());
+    Map<MetastoreColumn, Object> filterConditions = new HashMap<>(tableKey.toFilterConditions());
+    filterConditions.put(MetastoreColumn.METADATA_KEY, unit.metadataKey());
 
     String location = tableKey.toLocation(TestTablesOperationTransformer.location);
     Expression expression = new FilterTransformer().transform(filterConditions);
@@ -93,12 +95,12 @@ public class TestTablesOperationTransformer extends IcebergBaseTest {
   @Test
   public void testToDeleteOperation() {
     FilterExpression filter = FilterExpression.and(
-      FilterExpression.equal("storagePlugin", "dfs"),
-      FilterExpression.equal("workspace", "tmp"));
+      FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"),
+      FilterExpression.equal(MetastoreColumn.WORKSPACE, "tmp"));
 
     Expression expected = Expressions.and(
-      Expressions.equal(IcebergTables.STORAGE_PLUGIN, "dfs"),
-      Expressions.equal(IcebergTables.WORKSPACE, "tmp"));
+      Expressions.equal(MetastoreColumn.STORAGE_PLUGIN.columnName(), "dfs"),
+      Expressions.equal(MetastoreColumn.WORKSPACE.columnName(), "tmp"));
 
     Delete operation = transformer.toDelete(filter);
 
@@ -107,8 +109,15 @@ public class TestTablesOperationTransformer extends IcebergBaseTest {
 
   @Test
   public void testToDeleteOperations() {
-    FilterExpression dfs = FilterExpression.equal("storagePlugin", "dfs");
-    FilterExpression s3 = FilterExpression.equal("storagePlugin", "s3");
+    org.apache.drill.metastore.operate.Delete dfs = org.apache.drill.metastore.operate.Delete.builder()
+      .metadataType(MetadataType.ALL)
+      .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"))
+      .build();
+
+    org.apache.drill.metastore.operate.Delete s3 = org.apache.drill.metastore.operate.Delete.builder()
+      .metadataType(MetadataType.ALL)
+      .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "s3"))
+      .build();
 
     List<Delete> operations = transformer.toDelete(Arrays.asList(dfs, s3));
 
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/schema/TestIcebergTableSchema.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/schema/TestIcebergTableSchema.java
index 3c66ba1..6c3d360 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/schema/TestIcebergTableSchema.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/schema/TestIcebergTableSchema.java
@@ -17,19 +17,23 @@
  */
 package org.apache.drill.metastore.iceberg.schema;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.MetastoreFieldDefinition;
 import org.apache.drill.metastore.iceberg.IcebergBaseTest;
 import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
+import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.types.Types;
 import org.junit.Test;
+import org.objectweb.asm.AnnotationVisitor;
 import org.objectweb.asm.ClassWriter;
 import org.objectweb.asm.FieldVisitor;
 import org.objectweb.asm.Opcodes;
 import org.objectweb.asm.Type;
 import org.objectweb.asm.signature.SignatureVisitor;
 import org.objectweb.asm.signature.SignatureWriter;
+import org.objectweb.asm.tree.AnnotationNode;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -50,32 +54,32 @@ public class TestIcebergTableSchema extends IcebergBaseTest {
 
       @Override
       void addFields(ClassWriter classWriter) {
-        FieldVisitor stringField = addField(classWriter, Opcodes.ACC_PRIVATE, "stringField", String.class);
-        annotate(stringField);
+        FieldVisitor stringField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.STORAGE_PLUGIN, String.class);
+        annotate(stringField, MetastoreColumn.STORAGE_PLUGIN, MetadataType.ALL);
 
-        FieldVisitor intField = addField(classWriter, Opcodes.ACC_PRIVATE, "intField", int.class);
-        annotate(intField);
+        FieldVisitor intField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.COLUMN, int.class);
+        annotate(intField, MetastoreColumn.COLUMN, MetadataType.TABLE);
 
-        FieldVisitor integerField = addField(classWriter, Opcodes.ACC_PRIVATE, "integerField", Integer.class);
-        annotate(integerField);
+        FieldVisitor integerField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.ROW_GROUP_INDEX, Integer.class);
+        annotate(integerField, MetastoreColumn.ROW_GROUP_INDEX, MetadataType.SEGMENT);
 
-        FieldVisitor longField = addField(classWriter, Opcodes.ACC_PRIVATE, "longField", Long.class);
-        annotate(longField);
+        FieldVisitor longField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.LAST_MODIFIED_TIME, Long.class);
+        annotate(longField, MetastoreColumn.LAST_MODIFIED_TIME, MetadataType.FILE);
 
-        FieldVisitor floatField = addField(classWriter, Opcodes.ACC_PRIVATE, "floatField", Float.class);
-        annotate(floatField);
+        FieldVisitor floatField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.COLUMNS_STATISTICS, Float.class);
+        annotate(floatField, MetastoreColumn.COLUMNS_STATISTICS, MetadataType.ALL);
 
-        FieldVisitor doubleField = addField(classWriter, Opcodes.ACC_PRIVATE, "doubleField", Double.class);
-        annotate(doubleField);
+        FieldVisitor doubleField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.ADDITIONAL_METADATA, Double.class);
+        annotate(doubleField, MetastoreColumn.ADDITIONAL_METADATA, MetadataType.TABLE);
 
-        FieldVisitor booleanField = addField(classWriter, Opcodes.ACC_PRIVATE, "booleanField", Boolean.class);
-        annotate(booleanField);
+        FieldVisitor booleanField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.INTERESTING_COLUMNS, Boolean.class);
+        annotate(booleanField, MetastoreColumn.INTERESTING_COLUMNS, MetadataType.SEGMENT);
 
-        FieldVisitor listField = addField(classWriter, Opcodes.ACC_PRIVATE, "listField", List.class, String.class);
-        annotate(listField);
+        FieldVisitor listField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.LOCATIONS, List.class, String.class);
+        annotate(listField, MetastoreColumn.LOCATIONS, MetadataType.PARTITION);
 
-        FieldVisitor mapField = addField(classWriter, Opcodes.ACC_PRIVATE, "mapField", Map.class, String.class, Float.class);
-        annotate(mapField);
+        FieldVisitor mapField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.HOST_AFFINITY, Map.class, String.class, Float.class);
+        annotate(mapField, MetastoreColumn.HOST_AFFINITY, MetadataType.ROW_GROUP);
       }
 
     }.generate();
@@ -86,16 +90,16 @@ public class TestIcebergTableSchema extends IcebergBaseTest {
     int complexTypesIndex = IcebergTableSchema.STARTING_COMPLEX_TYPES_INDEX;
 
     Schema expectedSchema = new Schema(
-      Types.NestedField.optional(schemaIndex++, "stringField", Types.StringType.get()),
-      Types.NestedField.optional(schemaIndex++, "intField", Types.IntegerType.get()),
-      Types.NestedField.optional(schemaIndex++, "integerField", Types.IntegerType.get()),
-      Types.NestedField.optional(schemaIndex++, "longField", Types.LongType.get()),
-      Types.NestedField.optional(schemaIndex++, "floatField", Types.FloatType.get()),
-      Types.NestedField.optional(schemaIndex++, "doubleField", Types.DoubleType.get()),
-      Types.NestedField.optional(schemaIndex++, "booleanField", Types.BooleanType.get()),
-      Types.NestedField.optional(schemaIndex++, "listField",
+      Types.NestedField.optional(schemaIndex++, MetastoreColumn.STORAGE_PLUGIN.columnName(), Types.StringType.get()),
+      Types.NestedField.optional(schemaIndex++, MetastoreColumn.COLUMN.columnName(), Types.IntegerType.get()),
+      Types.NestedField.optional(schemaIndex++, MetastoreColumn.ROW_GROUP_INDEX.columnName(), Types.IntegerType.get()),
+      Types.NestedField.optional(schemaIndex++, MetastoreColumn.LAST_MODIFIED_TIME.columnName(), Types.LongType.get()),
+      Types.NestedField.optional(schemaIndex++, MetastoreColumn.COLUMNS_STATISTICS.columnName(), Types.FloatType.get()),
+      Types.NestedField.optional(schemaIndex++, MetastoreColumn.ADDITIONAL_METADATA.columnName(), Types.DoubleType.get()),
+      Types.NestedField.optional(schemaIndex++, MetastoreColumn.INTERESTING_COLUMNS.columnName(), Types.BooleanType.get()),
+      Types.NestedField.optional(schemaIndex++, MetastoreColumn.LOCATIONS.columnName(),
         Types.ListType.ofOptional(complexTypesIndex++, Types.StringType.get())),
-      Types.NestedField.optional(schemaIndex, "mapField",
+      Types.NestedField.optional(schemaIndex, MetastoreColumn.HOST_AFFINITY.columnName(),
         Types.MapType.ofOptional(complexTypesIndex++, complexTypesIndex, Types.StringType.get(), Types.FloatType.get())));
 
     assertEquals(expectedSchema.asStruct(), schema.tableSchema().asStruct());
@@ -107,16 +111,16 @@ public class TestIcebergTableSchema extends IcebergBaseTest {
 
       @Override
       void addFields(ClassWriter classWriter) {
-        FieldVisitor stringField = addField(classWriter, Opcodes.ACC_PRIVATE, "stringField", String.class);
-        annotate(stringField);
+        FieldVisitor stringField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.STORAGE_PLUGIN, String.class);
+        annotate(stringField, MetastoreColumn.STORAGE_PLUGIN, MetadataType.ALL);
 
-        addField(classWriter, Opcodes.ACC_PRIVATE, "integerField", Integer.class);
+        addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.ADDITIONAL_METADATA, Integer.class);
       }
     }.generate();
 
     IcebergTableSchema schema = IcebergTableSchema.of(clazz, Collections.emptyList());
-    assertNotNull(schema.tableSchema().findField("stringField"));
-    assertNull(schema.tableSchema().findField("integerField"));
+    assertNotNull(schema.tableSchema().findField(MetastoreColumn.STORAGE_PLUGIN.columnName()));
+    assertNull(schema.tableSchema().findField(MetastoreColumn.ADDITIONAL_METADATA.columnName()));
   }
 
   @Test
@@ -141,8 +145,8 @@ public class TestIcebergTableSchema extends IcebergBaseTest {
             .buildSignature();
 
         FieldVisitor listField =
-            classWriter.visitField(Opcodes.ACC_PRIVATE, "stringField", descriptor, signature, null);
-        annotate(listField);
+            classWriter.visitField(Opcodes.ACC_PRIVATE, MetastoreColumn.ADDITIONAL_METADATA.columnName(), descriptor, signature, null);
+        annotate(listField, MetastoreColumn.ADDITIONAL_METADATA, MetadataType.ALL);
       }
     }.generate();
 
@@ -157,13 +161,13 @@ public class TestIcebergTableSchema extends IcebergBaseTest {
 
       @Override
       void addFields(ClassWriter classWriter) {
-        FieldVisitor stringField = addField(classWriter, Opcodes.ACC_PRIVATE, "stringField", String.class);
-        annotate(stringField);
+        FieldVisitor stringField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.STORAGE_PLUGIN, String.class);
+        annotate(stringField, MetastoreColumn.STORAGE_PLUGIN, MetadataType.ALL);
       }
     }.generate();
 
     IcebergTableSchema schema = IcebergTableSchema.of(clazz, Collections.emptyList());
-    assertNotNull(schema.tableSchema().findField("stringField"));
+    assertNotNull(schema.tableSchema().findField(MetastoreColumn.STORAGE_PLUGIN.columnName()));
 
     assertEquals(PartitionSpec.unpartitioned(), schema.partitionSpec());
   }
@@ -174,36 +178,37 @@ public class TestIcebergTableSchema extends IcebergBaseTest {
 
       @Override
       void addFields(ClassWriter classWriter) {
-        FieldVisitor partKey1 = addField(classWriter, Opcodes.ACC_PRIVATE, "partKey1", String.class);
-        annotate(partKey1);
+        FieldVisitor partKey1 = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.STORAGE_PLUGIN, String.class);
+        annotate(partKey1, MetastoreColumn.STORAGE_PLUGIN, MetadataType.ALL);
 
-        FieldVisitor partKey2 = addField(classWriter, Opcodes.ACC_PRIVATE, "partKey2", String.class);
-        annotate(partKey2);
+        FieldVisitor partKey2 = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.WORKSPACE, String.class);
+        annotate(partKey2, MetastoreColumn.WORKSPACE, MetadataType.ALL);
 
-        FieldVisitor partKey3 = addField(classWriter, Opcodes.ACC_PRIVATE, "partKey3", String.class);
-        annotate(partKey3);
+        FieldVisitor partKey3 = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.TABLE_NAME, String.class);
+        annotate(partKey3, MetastoreColumn.TABLE_NAME, MetadataType.ALL);
 
-        FieldVisitor integerField = addField(classWriter, Opcodes.ACC_PRIVATE, "integerField", Integer.class);
-        annotate(integerField);
+        FieldVisitor integerField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.ROW_GROUP_INDEX, Integer.class);
+        annotate(integerField, MetastoreColumn.ROW_GROUP_INDEX, MetadataType.ROW_GROUP);
 
-        FieldVisitor booleanField = addField(classWriter, Opcodes.ACC_PRIVATE, "booleanField", Boolean.class);
-        annotate(booleanField);
+        FieldVisitor stringField = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.OWNER, Boolean.class);
+        annotate(stringField, MetastoreColumn.OWNER, MetadataType.TABLE);
       }
     }.generate();
 
-    IcebergTableSchema schema = IcebergTableSchema.of(clazz, Arrays.asList("partKey1", "partKey2", "partKey3"));
+    IcebergTableSchema schema = IcebergTableSchema.of(clazz,
+      Arrays.asList(MetastoreColumn.STORAGE_PLUGIN, MetastoreColumn.WORKSPACE, MetastoreColumn.TABLE_NAME));
 
-    Types.NestedField partKey1 = schema.tableSchema().findField("partKey1");
+    Types.NestedField partKey1 = schema.tableSchema().findField(MetastoreColumn.STORAGE_PLUGIN.columnName());
     assertNotNull(partKey1);
 
-    Types.NestedField partKey2 = schema.tableSchema().findField("partKey2");
+    Types.NestedField partKey2 = schema.tableSchema().findField(MetastoreColumn.WORKSPACE.columnName());
     assertNotNull(partKey2);
 
-    Types.NestedField partKey3 = schema.tableSchema().findField("partKey3");
+    Types.NestedField partKey3 = schema.tableSchema().findField(MetastoreColumn.TABLE_NAME.columnName());
     assertNotNull(partKey3);
 
-    assertNotNull(schema.tableSchema().findField("integerField"));
-    assertNotNull(schema.tableSchema().findField("booleanField"));
+    assertNotNull(schema.tableSchema().findField(MetastoreColumn.ROW_GROUP_INDEX.columnName()));
+    assertNotNull(schema.tableSchema().findField(MetastoreColumn.OWNER.columnName()));
 
     Schema partitionSchema = new Schema(partKey1, partKey2, partKey3);
     PartitionSpec expectedPartitionSpec = PartitionSpec.builderFor(partitionSchema)
@@ -221,17 +226,17 @@ public class TestIcebergTableSchema extends IcebergBaseTest {
 
       @Override
       void addFields(ClassWriter classWriter) {
-        FieldVisitor partKey1 = addField(classWriter, Opcodes.ACC_PRIVATE, "partKey1", String.class);
-        annotate(partKey1);
+        FieldVisitor storagePlugin = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.STORAGE_PLUGIN, String.class);
+        annotate(storagePlugin, MetastoreColumn.STORAGE_PLUGIN, MetadataType.ALL);
 
-        FieldVisitor integerField = addField(classWriter, Opcodes.ACC_PRIVATE, "integerField", Integer.class);
-        annotate(integerField);
+        FieldVisitor tableName = addField(classWriter, Opcodes.ACC_PRIVATE, MetastoreColumn.TABLE_NAME, Integer.class);
+        annotate(tableName, MetastoreColumn.TABLE_NAME, MetadataType.ALL);
       }
     }.generate();
 
     thrown.expect(IcebergMetastoreException.class);
 
-    IcebergTableSchema.of(clazz, Arrays.asList("partKey1", "partKey2"));
+    IcebergTableSchema.of(clazz, Arrays.asList(MetastoreColumn.STORAGE_PLUGIN, MetastoreColumn.WORKSPACE));
   }
 
   /**
@@ -257,7 +262,7 @@ public class TestIcebergTableSchema extends IcebergBaseTest {
       }.injectClass(name, bytes);
     }
 
-    public FieldVisitor addField(ClassWriter classWriter, int access, String fieldName, Class<?> clazz, Class<?>... genericTypes) {
+    public FieldVisitor addField(ClassWriter classWriter, int access, MetastoreColumn column, Class<?> clazz, Class<?>... genericTypes) {
       String descriptor = Type.getType(clazz).getDescriptor();
 
       String signature = null;
@@ -277,11 +282,23 @@ public class TestIcebergTableSchema extends IcebergBaseTest {
             .buildSignature();
       }
 
-      return classWriter.visitField(access, fieldName, descriptor, signature, null);
+      return classWriter.visitField(access, column.columnName(), descriptor, signature, null);
     }
 
-    void annotate(FieldVisitor field) {
-      field.visitAnnotation(Type.getType(MetastoreFieldDefinition.class).getDescriptor(), true);
+    void annotate(FieldVisitor field, MetastoreColumn column, MetadataType... scopes) {
+      String annotationDescriptor = Type.getType(MetastoreFieldDefinition.class).getDescriptor();
+      AnnotationNode annotationNode = new AnnotationNode(annotationDescriptor);
+
+      annotationNode.visitEnum("column", Type.getType(MetastoreColumn.class).getDescriptor(), column.name());
+      annotationNode.visitEnd();
+
+      AnnotationVisitor scopesNode = annotationNode.visitArray("scopes");
+      Arrays.stream(scopes)
+        .forEach(scope -> scopesNode.visitEnum("scopes", Type.getType(MetadataType.class).getDescriptor(), scope.name()));
+      scopesNode.visitEnd();
+
+      AnnotationVisitor annotationVisitor = field.visitAnnotation(annotationDescriptor, true);
+      annotationNode.accept(annotationVisitor);
     }
 
     private ClassWriter generateClass() {
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/transform/TestFilterTransformer.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/transform/TestFilterTransformer.java
index 112b07b..487bbab 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/transform/TestFilterTransformer.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/transform/TestFilterTransformer.java
@@ -17,15 +17,18 @@
  */
 package org.apache.drill.metastore.iceberg.transform;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.expressions.FilterExpression;
 import org.apache.drill.metastore.iceberg.IcebergBaseTest;
+import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -49,89 +52,88 @@ public class TestFilterTransformer extends IcebergBaseTest {
 
   @Test
   public void testToFilterEqual() {
-    Expression expected = Expressions.equal("a", 1);
-    Expression actual = transformer.transform(FilterExpression.equal("a", 1));
+    Expression expected = Expressions.equal(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1);
+    Expression actual = transformer.transform(FilterExpression.equal(MetastoreColumn.ROW_GROUP_INDEX, 1));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterNotEqual() {
-    Expression expected = Expressions.notEqual("a", 1);
-    Expression actual = transformer.transform(FilterExpression.notEqual("a", 1));
+    Expression expected = Expressions.notEqual(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1);
+    Expression actual = transformer.transform(FilterExpression.notEqual(MetastoreColumn.ROW_GROUP_INDEX, 1));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterLessThan() {
-    Expression expected = Expressions.lessThan("a", 1);
-    Expression actual = transformer.transform(FilterExpression.lessThan("a", 1));
+    Expression expected = Expressions.lessThan(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1);
+    Expression actual = transformer.transform(FilterExpression.lessThan(MetastoreColumn.ROW_GROUP_INDEX, 1));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterLessThanOrEqual() {
-    Expression expected = Expressions.lessThanOrEqual("a", 1);
-    Expression actual = transformer.transform(FilterExpression.lessThanOrEqual("a", 1));
+    Expression expected = Expressions.lessThanOrEqual(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1);
+    Expression actual = transformer.transform(FilterExpression.lessThanOrEqual(MetastoreColumn.ROW_GROUP_INDEX, 1));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterGreaterThan() {
-    Expression expected = Expressions.greaterThan("a", 1);
-    Expression actual = transformer.transform(FilterExpression.greaterThan("a", 1));
+    Expression expected = Expressions.greaterThan(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1);
+    Expression actual = transformer.transform(FilterExpression.greaterThan(MetastoreColumn.ROW_GROUP_INDEX, 1));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterGreaterThanOrEqual() {
-    Expression expected = Expressions.greaterThanOrEqual("a", 1);
-    Expression actual = transformer.transform(FilterExpression.greaterThanOrEqual("a", 1));
+    Expression expected = Expressions.greaterThanOrEqual(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1);
+    Expression actual = transformer.transform(FilterExpression.greaterThanOrEqual(MetastoreColumn.ROW_GROUP_INDEX, 1));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterIn() {
-    Expression expected = Expressions.or(Expressions.equal("a", 1), Expressions.equal("a", 2));
-    Expression actual = transformer.transform(FilterExpression.in("a", 1, 2));
+    Expression expected = Expressions.in(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1, 2);
+    Expression actual = transformer.transform(FilterExpression.in(MetastoreColumn.ROW_GROUP_INDEX, 1, 2));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterNotIn() {
-    Expression expected = Expressions.not(
-      Expressions.or(Expressions.equal("a", 1), Expressions.equal("a", 2)));
-    Expression actual = transformer.transform(FilterExpression.notIn("a", 1, 2));
+    Expression expected = Expressions.notIn(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1, 2);
+    Expression actual = transformer.transform(FilterExpression.notIn(MetastoreColumn.ROW_GROUP_INDEX, 1, 2));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterIsNull() {
-    Expression expected = Expressions.isNull("a");
-    Expression actual = transformer.transform(FilterExpression.isNull("a"));
+    Expression expected = Expressions.isNull(MetastoreColumn.ROW_GROUP_INDEX.columnName());
+    Expression actual = transformer.transform(FilterExpression.isNull(MetastoreColumn.ROW_GROUP_INDEX));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterIsNotNull() {
-    Expression expected = Expressions.notNull("a");
-    Expression actual = transformer.transform(FilterExpression.isNotNull("a"));
+    Expression expected = Expressions.notNull(MetastoreColumn.ROW_GROUP_INDEX.columnName());
+    Expression actual = transformer.transform(FilterExpression.isNotNull(MetastoreColumn.ROW_GROUP_INDEX));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterNot() {
-    Expression expected = Expressions.not(Expressions.equal("a", 1));
-    Expression actual = transformer.transform(FilterExpression.not(FilterExpression.equal("a", 1)));
+    Expression expected = Expressions.not(Expressions.equal(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1));
+    Expression actual = transformer.transform(FilterExpression.not(FilterExpression.equal(MetastoreColumn.ROW_GROUP_INDEX, 1)));
 
     assertEquals(expected.toString(), actual.toString());
   }
@@ -139,21 +141,29 @@ public class TestFilterTransformer extends IcebergBaseTest {
   @Test
   public void testToFilterAnd() {
     Expression expected = Expressions.and(
-      Expressions.equal("a", 1), Expressions.equal("b", 2),
-      Expressions.equal("c", 3), Expressions.equal("d", 4));
+      Expressions.equal(MetastoreColumn.STORAGE_PLUGIN.columnName(), "dfs"),
+      Expressions.equal(MetastoreColumn.WORKSPACE.columnName(), "tmp"),
+      Expressions.equal(MetastoreColumn.TABLE_NAME.columnName(), "nation"),
+      Expressions.equal(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 4));
 
     Expression actual = transformer.transform(FilterExpression.and(
-      FilterExpression.equal("a", 1), FilterExpression.equal("b", 2),
-      FilterExpression.equal("c", 3), FilterExpression.equal("d", 4)));
+      FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"),
+      FilterExpression.equal(MetastoreColumn.WORKSPACE, "tmp"),
+      FilterExpression.equal(MetastoreColumn.TABLE_NAME, "nation"),
+      FilterExpression.equal(MetastoreColumn.ROW_GROUP_INDEX, 4)));
 
     assertEquals(expected.toString(), actual.toString());
   }
 
   @Test
   public void testToFilterOr() {
-    Expression expected = Expressions.or(Expressions.equal("a", 1), Expressions.equal("a", 2));
+    Expression expected = Expressions.or(
+      Expressions.equal(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1),
+      Expressions.equal(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 2));
     Expression actual = transformer.transform(
-      FilterExpression.or(FilterExpression.equal("a", 1), FilterExpression.equal("a", 2)));
+      FilterExpression.or(
+        FilterExpression.equal(MetastoreColumn.ROW_GROUP_INDEX, 1),
+        FilterExpression.equal(MetastoreColumn.ROW_GROUP_INDEX, 2)));
 
     assertEquals(expected.toString(), actual.toString());
   }
@@ -177,7 +187,7 @@ public class TestFilterTransformer extends IcebergBaseTest {
 
   @Test
   public void testToFilterConditionsNull() {
-    assertEquals(Expressions.alwaysTrue().toString(), transformer.transform((Map<String, Object>) null).toString());
+    assertEquals(Expressions.alwaysTrue().toString(), transformer.transform((Map<MetastoreColumn, Object>) null).toString());
   }
 
   @Test
@@ -187,36 +197,111 @@ public class TestFilterTransformer extends IcebergBaseTest {
 
   @Test
   public void testToFilterConditionsOne() {
-    Map<String, Object> conditions = new HashMap<>();
-    conditions.put("a", 1);
+    Map<MetastoreColumn, Object> conditions = new LinkedHashMap<>();
+    conditions.put(MetastoreColumn.ROW_GROUP_INDEX, 1);
 
-    assertEquals(Expressions.equal("a", 1).toString(), transformer.transform(conditions).toString());
+    assertEquals(Expressions.equal(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 1).toString(), transformer.transform(conditions).toString());
   }
 
   @Test
   public void testToFilterConditionsTwo() {
-    Map<String, Object> conditions = new HashMap<>();
-    conditions.put("a", 1);
-    conditions.put("b", 2);
+    Map<MetastoreColumn, Object> conditions = new LinkedHashMap<>();
+    conditions.put(MetastoreColumn.STORAGE_PLUGIN, "dfs");
+    conditions.put(MetastoreColumn.WORKSPACE, "tmp");
 
     Expression expected = Expressions.and(
-      Expressions.equal("a", 1), Expressions.equal("b", 2));
+      Expressions.equal(MetastoreColumn.STORAGE_PLUGIN.columnName(), "dfs"),
+      Expressions.equal(MetastoreColumn.WORKSPACE.columnName(), "tmp"));
 
     assertEquals(expected.toString(), transformer.transform(conditions).toString());
   }
 
   @Test
   public void testToFilterConditionsFour() {
-    Map<String, Object> conditions = new HashMap<>();
-    conditions.put("a", 1);
-    conditions.put("b", 2);
-    conditions.put("c", 3);
-    conditions.put("d", 4);
+    Map<MetastoreColumn, Object> conditions = new LinkedHashMap<>();
+    conditions.put(MetastoreColumn.STORAGE_PLUGIN, "dfs");
+    conditions.put(MetastoreColumn.WORKSPACE, "tmp");
+    conditions.put(MetastoreColumn.TABLE_NAME, "nation");
+    conditions.put(MetastoreColumn.ROW_GROUP_INDEX, 4);
 
     Expression expected = Expressions.and(
-      Expressions.equal("a", 1), Expressions.equal("b", 2),
-      Expressions.equal("c", 3), Expressions.equal("d", 4));
+      Expressions.equal(MetastoreColumn.STORAGE_PLUGIN.columnName(), "dfs"),
+      Expressions.equal(MetastoreColumn.WORKSPACE.columnName(), "tmp"),
+      Expressions.equal(MetastoreColumn.TABLE_NAME.columnName(), "nation"),
+      Expressions.equal(MetastoreColumn.ROW_GROUP_INDEX.columnName(), 4));
 
     assertEquals(expected.toString(), transformer.transform(conditions).toString());
   }
+
+  @Test
+  public void testToFilterMetadataTypesAll() {
+    Expression expected = Expressions.alwaysTrue();
+
+    Expression actual = transformer.transform(
+      Arrays.asList(MetadataType.PARTITION, MetadataType.FILE, MetadataType.ALL));
+
+    assertEquals(expected.toString(), actual.toString());
+  }
+
+  @Test
+  public void testToFilterMetadataTypesOne() {
+    Expression expected = Expressions.equal(MetastoreColumn.METADATA_TYPE.columnName(), MetadataType.PARTITION.name());
+
+    Expression actual = transformer.transform(Collections.singletonList(MetadataType.PARTITION));
+
+    assertEquals(expected.toString(), actual.toString());
+  }
+
+  @Test
+  public void testToFilterMetadataTypesSeveral() {
+    Expression expected = Expressions.in(MetastoreColumn.METADATA_TYPE.columnName(),
+      MetadataType.PARTITION.name(), MetadataType.FILE.name());
+
+    Expression actual = transformer.transform(
+      Arrays.asList(MetadataType.PARTITION, MetadataType.FILE));
+
+    assertEquals(expected.toString(), actual.toString());
+  }
+
+  @Test
+  public void testCombineNone() {
+    Expression expected = Expressions.alwaysTrue();
+
+    Expression actual = transformer.combine();
+
+    assertEquals(expected.toString(), actual.toString());
+  }
+
+  @Test
+  public void testCombineOne() {
+    Expression expected = Expressions.equal("a", 1);
+
+    Expression actual = transformer.combine(expected);
+
+    assertEquals(expected.toString(), actual.toString());
+  }
+
+  @Test
+  public void testCombineTwo() {
+    Expression expected = Expressions.and(
+      Expressions.equal("a", 1), Expressions.equal("a", 2));
+
+    Expression actual = transformer.combine(
+      Expressions.equal("a", 1), Expressions.equal("a", 2));
+
+    assertEquals(expected.toString(), actual.toString());
+  }
+
+  @Test
+  public void testCombineFour() {
+    Expression expected = Expressions.and(
+      Expressions.equal("a", 1), Expressions.equal("a", 2),
+      Expressions.equal("a", 3), Expressions.equal("a", 4));
+
+    Expression actual = transformer.combine(
+      Expressions.equal("a", 1), Expressions.equal("a", 2),
+      Expressions.equal("a", 3), Expressions.equal("a", 4));
+
+    assertEquals(expected.toString(), actual.toString());
+  }
 }
diff --git a/metastore/metastore-api/README.md b/metastore/metastore-api/README.md
index 73e5831..c7dd701 100644
--- a/metastore/metastore-api/README.md
+++ b/metastore/metastore-api/README.md
@@ -75,9 +75,22 @@ Metastore component may or may not support properties depending on the implement
 If properties are supported, map with properties names and values is returned.
 otherwise empty map is returned. `Metadata#properties` is used to obtain properties information. 
 
-### Filter expression
+### Data filtering
+
+Metastore data can be read or deleted based on the filter expression and metadata types.
+
+#### Metadata types
+
+Each concrete Metastore component implementation supports specific metadata types
+which identify metadata inside Metastore component units. For example, for `tables`
+component, supported metadata types are` TABLE`, `SEGMENT`, `FILE`, `ROW_GROUP`, `PARTITION`.
+Metadata types are based on `org.apache.drill.metastore.metadata.MetadataType` enum names.
+
+Metadata types should be indicated during read or delete operations.
+If all metadata types are needed, `MetadataType#ALL` metadata type can be indicated.
+
+#### Filter expressions
 
-Metastore data can be read or deleted based on the filter expression.
 All filter expressions implement `FilterExpression` interface. 
 List of supported filter operators is indicated in `FilterExpression.Operator` enum.
 When filter expression is provided in read or delete operation, it's up to Metastore
@@ -87,8 +100,8 @@ For convenience, `FilterExpression.Visitor` can be implemented to traverse filte
 Filter expression can be simple and contain only one condition:
 
 ```
-FilterExpression storagePlugin = FilterExpression.equal("storagePlugin", "dfs");
-FilterExpression workspaces = FilterExpression.in("workspace", "root", "tmp");
+FilterExpression storagePlugin = FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs");
+FilterExpression workspaces = FilterExpression.in(MetastoreColumn.WORKSPACE, "root", "tmp");
 
 ```
 
@@ -96,10 +109,11 @@ Or it can be complex and contain several conditions combined with `AND` or `OR`
 
 ```
   FilterExpression filter = FilterExpression.and(
-    FilterExpression.equal("storagePlugin", "dfs"),
-    FilterExpression.in("workspace", "root", "tmp"));
+    FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"),
+    FilterExpression.in(MetastoreColumn.WORKSPACE, "root", "tmp"));
   
   metastore.tables().read()
+    .metadataType(MetadataType.TABLE)
     .filters(filter)
     .execute();
 ```
@@ -110,6 +124,7 @@ SQL-like equivalent for the above operation is:
   select * from METASTORE.TABLES
   where storagePlugin = 'dfs'
   and workspace in ('root', 'tmp')
+  and metadataType = 'TABLE'
 ```
 
 ### Metastore Read
@@ -117,6 +132,7 @@ SQL-like equivalent for the above operation is:
 In order to provide read functionality each component must implement `Read`.
 During implementation component unit type must be indicated.
 `Metastore.Read#columns` allows to specify list of columns to be retrieved from the Metastore component.
+Columns are represented by `MetastoreColumn` enum values.
 `Metastore.Read#filter` allows to specify filter expression by which data will be retrieved.
 `Metastore.Read#execute` executes read operation and returns the results.
 Data is returned in a form of list of component metadata units, it is caller responsibility to transform received
@@ -128,8 +144,9 @@ for all tables in the `dfs` storage plugin the following code can be used:
 
 ```
   List<TableMetadataUnit> units = metastore.tables().read()
-    .columns("tableName", "lastModifiedTime")
-    .filter(FilterExpression.equal("storagePlugin", "dfs")
+    .metadataType(MetadataType.TABLE)
+    .columns(MetastoreColumn.TABLE_NAME, MetastoreColumn.LAST_MODIFIED_TIME)
+    .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs")
     .execute();
 ```
 
@@ -276,16 +293,18 @@ all segment information must be overwritten.
 
 #### Delete
 
-`Read#delete` deletes data from the Metastore component based on the provided filter expression.
+`Read#delete` deletes data from the Metastore component based on the provided delete operation.
+
+Delete operation consists of delete filter and metadata types.
 
 Assume metadata for table `dfs.tmp.nation` already exists in the Metastore `tables` component
-and caller needs to delete it. First, deletion filter must be created:
+and caller needs to delete it and all its metadata. First, deletion filter must be created:
 
 ```
-    FilterExpression filter = FilterExpression.and(
-      FilterExpression.equal("storagePlugin", "dfs"),
-      FilterExpression.equal("workspace", "tmp"),
-      FilterExpression.equal("tableName", "nation"));
+    FilterExpression deleteFilter = FilterExpression.and(
+      FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"),
+      FilterExpression.equal(MetastoreColumn.WORKSPACE, "tmp"),
+      FilterExpression.equal(MetastoreColumn.TABLE_NAME, "nation"));
 ```
 
 Such filter can be also generated using `TableInfo` class:
@@ -297,25 +316,35 @@ Such filter can be also generated using `TableInfo` class:
       .name("nation")
       .build();
       
-    FilterExpression filter = tableInfo.toFilter();  
+    FilterExpression deleteFilter = tableInfo.toFilter();  
+```
+
+Then metadata type should be indicated, since all table metadata needs to be deleted,
+`ALL` metadata type should be specified.
+
+```
+    Delete deleteOperation = Delete.builder()
+      .metadataType(MetadataType.ALL)
+      .filter(deleteFilter)
+      .build()
 ```
 
 Delete operation can be executed using the following code:
 
 ```
     metastore.tables().modify()
-      .delete(filter)
+      .delete(deleteOperation)
       .execute();
 ```
 
 #### Purge
 
-`Read#purge` deletes all data from the Metastore component.
+`Read#purge` deletes all data from the Metastore component. Purge is terminal operation
+and cannot be used together with overwrite or delete.
 
 ```
     metastore.tables().modify()
-      .purge()
-      .execute();
+      .purge();
 ```
 
 #### Transactions
@@ -328,7 +357,7 @@ If Metastore implementation does not support transactions, all operations will b
     metastore.tables().modify()
       .overwrite(tableUnit1, segmentUnit1)
       .overwrite(tableUnit2, segmentUnit2)
-      .delete(table3Filter)
-      .delete(table4Filter)
+      .delete(deleteOperation1)
+      .delete(deleteOperation2)
       .execute();
 ```
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreColumn.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreColumn.java
new file mode 100644
index 0000000..d5f045c
--- /dev/null
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreColumn.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore;
+
+import org.apache.drill.metastore.exceptions.MetastoreException;
+
+import java.util.stream.Stream;
+
+/**
+ * Metastore column definition, contains all Metastore column and their name
+ * to unique their usage in the code.
+ */
+public enum MetastoreColumn {
+
+  STORAGE_PLUGIN("storagePlugin"),
+  WORKSPACE("workspace"),
+  TABLE_NAME("tableName"),
+  OWNER("owner"),
+  TABLE_TYPE("tableType"),
+  METADATA_TYPE("metadataType"),
+  METADATA_KEY("metadataKey"),
+  LOCATION("location"),
+  INTERESTING_COLUMNS("interestingColumns"),
+  SCHEMA("schema"),
+  COLUMNS_STATISTICS("columnsStatistics"),
+  METADATA_STATISTICS("metadataStatistics"),
+  LAST_MODIFIED_TIME("lastModifiedTime"),
+  PARTITION_KEYS("partitionKeys"),
+  ADDITIONAL_METADATA("additionalMetadata"),
+  METADATA_IDENTIFIER("metadataIdentifier"),
+  COLUMN("column"),
+  LOCATIONS("locations"),
+  PARTITION_VALUES("partitionValues"),
+  PATH("path"),
+  ROW_GROUP_INDEX("rowGroupIndex"),
+  HOST_AFFINITY("hostAffinity");
+
+  private final String columnName;
+
+  MetastoreColumn(String columnName) {
+    this.columnName = columnName;
+  }
+
+  public String columnName() {
+    return columnName;
+  }
+
+  /**
+   * Looks up {@link MetastoreColumn} value for the given column name.
+   *
+   * @param columnName column name
+   * @return {@link MetastoreColumn} value
+   * @throws MetastoreException if {@link MetastoreColumn} value is not found
+   */
+  public static MetastoreColumn of(String columnName) {
+    return Stream.of(MetastoreColumn.values())
+      .filter(column -> column.columnName.equals(columnName))
+      // we don't expect duplicates by column name in the enum
+      .findAny()
+      .orElseThrow(() -> new MetastoreException(String.format("Column with name [%s] is absent.", columnName)));
+  }
+}
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreFieldDefinition.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreFieldDefinition.java
index c81b17c..523ca39 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreFieldDefinition.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreFieldDefinition.java
@@ -35,6 +35,13 @@ import java.lang.annotation.Target;
 public @interface MetastoreFieldDefinition {
 
   /**
+   * Indicates Metastore column which can be used in select or filter.
+   *
+   * @return column enum value
+   */
+  MetastoreColumn column();
+
+  /**
    * Indicated metadata types to which field belongs to.
    *
    * @return array of metadata types field belongs to
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java
index fbee2d8..2dee1a6 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.components.tables;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.expressions.FilterExpression;
 import org.apache.drill.metastore.metadata.BaseTableMetadata;
 import org.apache.drill.metastore.metadata.FileMetadata;
@@ -41,13 +42,6 @@ import java.util.stream.Collectors;
  */
 public class BasicTablesRequests {
 
-  public static final String LAST_MODIFIED_TIME = "lastModifiedTime";
-  public static final String PATH = "path";
-  public static final String LOCATION = "location";
-  public static final String COLUMN = "column";
-  public static final String INTERESTING_COLUMNS = "interestingColumns";
-  public static final String PARTITION_KEYS = "partitionKeys";
-
   private final Tables tables;
 
   public BasicTablesRequests(Tables tables) {
@@ -72,8 +66,8 @@ public class BasicTablesRequests {
     RequestMetadata requestMetadata = RequestMetadata.builder()
       .tableInfo(tableInfo)
       .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
-      .metadataType(MetadataType.TABLE.name())
-      .requestColumns(LAST_MODIFIED_TIME)
+      .metadataType(MetadataType.TABLE)
+      .requestColumns(MetastoreColumn.LAST_MODIFIED_TIME)
       .build();
 
     long version = tables.metadata().version();
@@ -121,7 +115,7 @@ public class BasicTablesRequests {
     RequestMetadata requestMetadata = RequestMetadata.builder()
       .customFilter(filter)
       .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
-      .metadataType(MetadataType.TABLE.name())
+      .metadataType(MetadataType.TABLE)
       .requestColumns(TableMetadataUnit.SCHEMA.tableColumns())
       .build();
 
@@ -172,7 +166,7 @@ public class BasicTablesRequests {
       .tableInfo(tableInfo)
       .locations(locations)
       .metadataKey(metadataKey)
-      .metadataType(MetadataType.SEGMENT.name())
+      .metadataType(MetadataType.SEGMENT)
       .requestColumns(TableMetadataUnit.SCHEMA.segmentColumns())
       .build();
 
@@ -202,7 +196,7 @@ public class BasicTablesRequests {
       .tableInfo(tableInfo)
       .locations(locations)
       .column(column)
-      .metadataType(MetadataType.SEGMENT.name())
+      .metadataType(MetadataType.SEGMENT)
       .requestColumns(TableMetadataUnit.SCHEMA.segmentColumns())
       .build();
 
@@ -237,7 +231,7 @@ public class BasicTablesRequests {
         .tableInfo(tableInfo)
         .metadataKeys(keys)
         .identifiers(identifiers)
-        .metadataType(MetadataType.SEGMENT.name())
+        .metadataType(MetadataType.SEGMENT)
         .requestColumns(TableMetadataUnit.SCHEMA.segmentColumns())
         .build();
 
@@ -253,6 +247,7 @@ public class BasicTablesRequests {
    *   select * from METASTORE
    *   where storage = 'dfs' and workspace = 'tmp' and tableName = 'nation'
    *   and identifier in ('part_int=3', …)
+   *   and metadataType in ('SEGMENT', …)
    * </pre>
    *
    * @param tableInfo table information
@@ -267,8 +262,8 @@ public class BasicTablesRequests {
         .map(MetadataInfo::identifier)
         .collect(Collectors.toList());
 
-    List<String> metadataTypes = metadataInfos.stream()
-        .map(metadataInfo -> metadataInfo.type().name())
+    List<MetadataType> metadataTypes = metadataInfos.stream()
+        .map(MetadataInfo::type)
         .collect(Collectors.toList());
 
     RequestMetadata requestMetadata = RequestMetadata.builder()
@@ -303,7 +298,7 @@ public class BasicTablesRequests {
       .tableInfo(tableInfo)
       .metadataKeys(metadataKeys)
       .column(column)
-      .metadataType(MetadataType.PARTITION.name())
+      .metadataType(MetadataType.PARTITION)
       .requestColumns(TableMetadataUnit.SCHEMA.partitionColumns())
       .build();
 
@@ -333,7 +328,7 @@ public class BasicTablesRequests {
       .tableInfo(tableInfo)
       .metadataKey(metadataKey)
       .paths(paths)
-      .metadataType(MetadataType.FILE.name())
+      .metadataType(MetadataType.FILE)
       .requestColumns(TableMetadataUnit.SCHEMA.fileColumns())
       .build();
 
@@ -368,7 +363,7 @@ public class BasicTablesRequests {
         .tableInfo(tableInfo)
         .metadataKeys(keys)
         .identifiers(identifiers)
-        .metadataType(MetadataType.FILE.name())
+        .metadataType(MetadataType.FILE)
         .requestColumns(TableMetadataUnit.SCHEMA.fileColumns())
         .build();
 
@@ -400,7 +395,7 @@ public class BasicTablesRequests {
       .tableInfo(tableInfo)
       .metadataKey(metadataKey)
       .path(path)
-      .metadataType(MetadataType.FILE.name())
+      .metadataType(MetadataType.FILE)
       .requestColumns(TableMetadataUnit.SCHEMA.fileColumns())
       .build();
 
@@ -430,7 +425,7 @@ public class BasicTablesRequests {
       .tableInfo(tableInfo)
       .metadataKey(metadataKey)
       .path(path)
-      .metadataType(MetadataType.ROW_GROUP.name())
+      .metadataType(MetadataType.ROW_GROUP)
       .requestColumns(TableMetadataUnit.SCHEMA.rowGroupColumns())
       .build();
 
@@ -460,7 +455,7 @@ public class BasicTablesRequests {
         .tableInfo(tableInfo)
         .metadataKeys(metadataKeys)
         .paths(paths)
-        .metadataType(MetadataType.ROW_GROUP.name())
+        .metadataType(MetadataType.ROW_GROUP)
         .requestColumns(TableMetadataUnit.SCHEMA.rowGroupColumns())
         .build();
 
@@ -495,7 +490,7 @@ public class BasicTablesRequests {
         .tableInfo(tableInfo)
         .metadataKeys(keys)
         .identifiers(identifiers)
-        .metadataType(MetadataType.ROW_GROUP.name())
+        .metadataType(MetadataType.ROW_GROUP)
         .requestColumns(TableMetadataUnit.SCHEMA.rowGroupColumns())
         .build();
 
@@ -525,7 +520,7 @@ public class BasicTablesRequests {
       .tableInfo(tableInfo)
       .metadataKeys(metadataKeys)
       .locations(locations)
-      .metadataTypes(Arrays.asList(MetadataType.SEGMENT.name(), MetadataType.FILE.name(), MetadataType.ROW_GROUP.name()))
+      .metadataTypes(MetadataType.SEGMENT, MetadataType.FILE, MetadataType.ROW_GROUP)
       .build();
 
     List<TableMetadataUnit> units = request(requestMetadata);
@@ -554,8 +549,8 @@ public class BasicTablesRequests {
       .tableInfo(tableInfo)
       .metadataKey(metadataKey)
       .locations(locations)
-      .metadataType(MetadataType.FILE.name())
-      .requestColumns(PATH, LAST_MODIFIED_TIME)
+      .metadataType(MetadataType.FILE)
+      .requestColumns(MetastoreColumn.PATH, MetastoreColumn.LAST_MODIFIED_TIME)
       .build();
 
     return request(requestMetadata).stream()
@@ -584,8 +579,8 @@ public class BasicTablesRequests {
     RequestMetadata requestMetadata = RequestMetadata.builder()
       .tableInfo(tableInfo)
       .locations(locations)
-      .metadataType(MetadataType.SEGMENT.name())
-      .requestColumns(MetadataInfo.METADATA_KEY, LAST_MODIFIED_TIME)
+      .metadataType(MetadataType.SEGMENT)
+      .requestColumns(MetastoreColumn.METADATA_KEY, MetastoreColumn.LAST_MODIFIED_TIME)
       .build();
 
     return request(requestMetadata).stream()
@@ -615,8 +610,8 @@ public class BasicTablesRequests {
     RequestMetadata requestMetadata = RequestMetadata.builder()
       .tableInfo(tableInfo)
       .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
-      .metadataType(MetadataType.TABLE.name())
-      .requestColumns(INTERESTING_COLUMNS, PARTITION_KEYS)
+      .metadataType(MetadataType.TABLE)
+      .requestColumns(MetastoreColumn.INTERESTING_COLUMNS, MetastoreColumn.PARTITION_KEYS)
       .build();
 
     return retrieveSingleElement(request(requestMetadata));
@@ -630,6 +625,7 @@ public class BasicTablesRequests {
    */
   public List<TableMetadataUnit> request(RequestMetadata requestMetadata) {
     return tables.read()
+      .metadataTypes(requestMetadata.metadataTypes())
       .filter(requestMetadata.filter())
       .columns(requestMetadata.columns())
       .execute();
@@ -656,26 +652,32 @@ public class BasicTablesRequests {
   }
 
   /**
-   * Request metadata holder that provides request filters and columns.
+   * Request metadata holder that provides request metadata types, filters and columns.
    * Combines given filters using {@link FilterExpression.Operator#AND} operator.
    * Supports only {@link FilterExpression.Operator#EQUAL} and {@link FilterExpression.Operator#IN}
    * operators for predefined filter references, for other cases custom filter can be used.
    */
   public static class RequestMetadata {
 
+    private List<MetadataType> metadataTypes;
     private final FilterExpression filter;
-    private final List<String> columns;
+    private final List<MetastoreColumn> columns;
 
-    private RequestMetadata(FilterExpression filter, List<String> columns) {
+    private RequestMetadata(List<MetadataType> metadataTypes, FilterExpression filter, List<MetastoreColumn> columns) {
+      this.metadataTypes = metadataTypes;
       this.filter = filter;
       this.columns = columns;
     }
 
+    public List<MetadataType> metadataTypes() {
+      return metadataTypes;
+    }
+
     public FilterExpression filter() {
       return filter;
     }
 
-    public List<String> columns() {
+    public List<MetastoreColumn> columns() {
       return columns;
     }
 
@@ -689,15 +691,29 @@ public class BasicTablesRequests {
       private String location;
       private List<String> locations;
       private String column;
-      private String metadataType;
-      private List<String> metadataTypes;
       private String metadataKey;
       private List<String> metadataKeys;
       private String path;
       private List<String> paths;
       private List<String> identifiers;
       private FilterExpression customFilter;
-      private final List<String> requestColumns = new ArrayList<>();
+      private List<MetadataType> metadataTypes = new ArrayList<>();
+      private final List<MetastoreColumn> requestColumns = new ArrayList<>();
+
+      public RequestMetadata.Builder metadataType(MetadataType metadataType) {
+        this.metadataTypes.add(metadataType);
+        return this;
+      }
+
+      public RequestMetadata.Builder metadataTypes(MetadataType... metadataTypes) {
+        this.metadataTypes.addAll(Arrays.asList(metadataTypes));
+        return this;
+      }
+
+      public RequestMetadata.Builder metadataTypes(List<MetadataType> metadataTypes) {
+        this.metadataTypes.addAll(metadataTypes);
+        return this;
+      }
 
       public RequestMetadata.Builder tableInfo(TableInfo tableInfo) {
         this.tableInfo = tableInfo;
@@ -719,16 +735,6 @@ public class BasicTablesRequests {
         return this;
       }
 
-      public RequestMetadata.Builder metadataType(String metadataType) {
-        this.metadataType = metadataType;
-        return this;
-      }
-
-      public RequestMetadata.Builder metadataTypes(List<String> metadataTypes) {
-        this.metadataTypes = metadataTypes;
-        return this;
-      }
-
       public RequestMetadata.Builder metadataKey(String metadataKey) {
         this.metadataKey = metadataKey;
         return this;
@@ -759,17 +765,17 @@ public class BasicTablesRequests {
         return this;
       }
 
-      public RequestMetadata.Builder requestColumns(List<String> requestColumns) {
+      public RequestMetadata.Builder requestColumns(List<MetastoreColumn> requestColumns) {
         this.requestColumns.addAll(requestColumns);
         return this;
       }
 
-      public RequestMetadata.Builder requestColumns(String... requestColumns) {
+      public RequestMetadata.Builder requestColumns(MetastoreColumn... requestColumns) {
         return requestColumns(Arrays.asList(requestColumns));
       }
 
       public RequestMetadata build() {
-        return new RequestMetadata(createFilter(), requestColumns);
+        return new RequestMetadata(metadataTypes, createFilter(), requestColumns);
       }
 
       private FilterExpression createFilter() {
@@ -777,16 +783,14 @@ public class BasicTablesRequests {
         if (tableInfo != null) {
           filters.add(tableInfo.toFilter());
         }
-        addFilter(LOCATION, location, filters);
-        addFilter(LOCATION, locations, filters);
-        addFilter(COLUMN, column, filters);
-        addFilter(MetadataInfo.METADATA_TYPE, metadataType, filters);
-        addFilter(MetadataInfo.METADATA_TYPE, metadataTypes, filters);
-        addFilter(MetadataInfo.METADATA_KEY, metadataKey, filters);
-        addFilter(MetadataInfo.METADATA_KEY, metadataKeys, filters);
-        addFilter(PATH, path, filters);
-        addFilter(PATH, paths, filters);
-        addFilter(MetadataInfo.METADATA_IDENTIFIER, identifiers, filters);
+        addFilter(MetastoreColumn.LOCATION, location, filters);
+        addFilter(MetastoreColumn.LOCATION, locations, filters);
+        addFilter(MetastoreColumn.COLUMN, column, filters);
+        addFilter(MetastoreColumn.METADATA_KEY, metadataKey, filters);
+        addFilter(MetastoreColumn.METADATA_KEY, metadataKeys, filters);
+        addFilter(MetastoreColumn.PATH, path, filters);
+        addFilter(MetastoreColumn.PATH, paths, filters);
+        addFilter(MetastoreColumn.METADATA_IDENTIFIER, identifiers, filters);
         if (customFilter != null) {
           filters.add(customFilter);
         }
@@ -809,11 +813,11 @@ public class BasicTablesRequests {
        * creates {@link FilterExpression.Operator#IN} filter, if List is empty, does nothing.
        * For all other cases, creates {@link FilterExpression.Operator#EQUAL} filter.
        *
-       * @param reference filter reference
+       * @param column Metastore column to which filter will be applied
        * @param value filter value
        * @param filters current list of filters
        */
-      private <T> void addFilter(String reference, T value, List<FilterExpression> filters) {
+      private <T> void addFilter(MetastoreColumn column, T value, List<FilterExpression> filters) {
         if (value == null) {
           return;
         }
@@ -823,11 +827,11 @@ public class BasicTablesRequests {
           if (list.isEmpty()) {
             return;
           }
-          filters.add(FilterExpression.in(reference, list));
+          filters.add(FilterExpression.in(column, list));
           return;
         }
 
-        filters.add(FilterExpression.equal(reference, value));
+        filters.add(FilterExpression.equal(column, value));
       }
     }
   }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
index 1d6ed84..da5d8fc 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.components.tables;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.MetastoreFieldDefinition;
 import org.apache.drill.metastore.exceptions.MetastoreException;
 import org.apache.drill.metastore.metadata.MetadataType;
@@ -52,31 +53,31 @@ public class TableMetadataUnit {
 
   public static final Schema SCHEMA = Schema.of(TableMetadataUnit.class, Builder.class);
 
-  @MetastoreFieldDefinition(scopes = {ALL}) private final String storagePlugin;
-  @MetastoreFieldDefinition(scopes = {ALL}) private final String workspace;
-  @MetastoreFieldDefinition(scopes = {ALL}) private final String tableName;
-  @MetastoreFieldDefinition(scopes = {TABLE}) private final String owner;
-  @MetastoreFieldDefinition(scopes = {TABLE}) private final String tableType;
-  @MetastoreFieldDefinition(scopes = {ALL}) private final String metadataType;
-  @MetastoreFieldDefinition(scopes = {ALL}) private final String metadataKey;
-  @MetastoreFieldDefinition(scopes = {TABLE, SEGMENT, FILE, ROW_GROUP}) private final String location;
-  @MetastoreFieldDefinition(scopes = {TABLE}) private final List<String> interestingColumns;
-  @MetastoreFieldDefinition(scopes = {ALL}) private final String schema;
-  @MetastoreFieldDefinition(scopes = {ALL}) private final Map<String, String> columnsStatistics;
-  @MetastoreFieldDefinition(scopes = {ALL}) private final List<String> metadataStatistics;
-  @MetastoreFieldDefinition(scopes = {ALL}) private final Long lastModifiedTime;
-  @MetastoreFieldDefinition(scopes = {TABLE}) private final Map<String, String> partitionKeys;
-  @MetastoreFieldDefinition(scopes = {ALL}) private final String additionalMetadata;
-
-  @MetastoreFieldDefinition(scopes = {SEGMENT, FILE, ROW_GROUP, PARTITION}) private final String metadataIdentifier;
-  @MetastoreFieldDefinition(scopes = {SEGMENT, PARTITION}) private final String column;
-  @MetastoreFieldDefinition(scopes = {SEGMENT, PARTITION}) private final List<String> locations;
-  @MetastoreFieldDefinition(scopes = {SEGMENT, PARTITION}) private final List<String> partitionValues;
-
-  @MetastoreFieldDefinition(scopes = {SEGMENT, FILE, ROW_GROUP}) private final String path;
-
-  @MetastoreFieldDefinition(scopes = {ROW_GROUP}) private final Integer rowGroupIndex;
-  @MetastoreFieldDefinition(scopes = {ROW_GROUP}) private final Map<String, Float> hostAffinity;
+  @MetastoreFieldDefinition(column = MetastoreColumn.STORAGE_PLUGIN, scopes = {ALL}) private final String storagePlugin;
+  @MetastoreFieldDefinition(column = MetastoreColumn.WORKSPACE,scopes = {ALL}) private final String workspace;
+  @MetastoreFieldDefinition(column = MetastoreColumn.TABLE_NAME, scopes = {ALL}) private final String tableName;
+  @MetastoreFieldDefinition(column = MetastoreColumn.OWNER, scopes = {TABLE}) private final String owner;
+  @MetastoreFieldDefinition(column = MetastoreColumn.TABLE_TYPE, scopes = {TABLE}) private final String tableType;
+  @MetastoreFieldDefinition(column = MetastoreColumn.METADATA_TYPE, scopes = {ALL}) private final String metadataType;
+  @MetastoreFieldDefinition(column = MetastoreColumn.METADATA_KEY, scopes = {ALL}) private final String metadataKey;
+  @MetastoreFieldDefinition(column = MetastoreColumn.LOCATION, scopes = {TABLE, SEGMENT, FILE, ROW_GROUP}) private final String location;
+  @MetastoreFieldDefinition(column = MetastoreColumn.INTERESTING_COLUMNS, scopes = {TABLE}) private final List<String> interestingColumns;
+  @MetastoreFieldDefinition(column = MetastoreColumn.SCHEMA, scopes = {ALL}) private final String schema;
+  @MetastoreFieldDefinition(column = MetastoreColumn.COLUMNS_STATISTICS, scopes = {ALL}) private final Map<String, String> columnsStatistics;
+  @MetastoreFieldDefinition(column = MetastoreColumn.METADATA_STATISTICS, scopes = {ALL}) private final List<String> metadataStatistics;
+  @MetastoreFieldDefinition(column = MetastoreColumn.LAST_MODIFIED_TIME, scopes = {ALL}) private final Long lastModifiedTime;
+  @MetastoreFieldDefinition(column = MetastoreColumn.PARTITION_KEYS, scopes = {TABLE}) private final Map<String, String> partitionKeys;
+  @MetastoreFieldDefinition(column = MetastoreColumn.ADDITIONAL_METADATA, scopes = {ALL}) private final String additionalMetadata;
+
+  @MetastoreFieldDefinition(column = MetastoreColumn.METADATA_IDENTIFIER, scopes = {SEGMENT, FILE, ROW_GROUP, PARTITION}) private final String metadataIdentifier;
+  @MetastoreFieldDefinition(column = MetastoreColumn.COLUMN, scopes = {SEGMENT, PARTITION}) private final String column;
+  @MetastoreFieldDefinition(column = MetastoreColumn.LOCATIONS, scopes = {SEGMENT, PARTITION}) private final List<String> locations;
+  @MetastoreFieldDefinition(column = MetastoreColumn.PARTITION_VALUES, scopes = {SEGMENT, PARTITION}) private final List<String> partitionValues;
+
+  @MetastoreFieldDefinition(column = MetastoreColumn.PATH, scopes = {SEGMENT, FILE, ROW_GROUP}) private final String path;
+
+  @MetastoreFieldDefinition(column = MetastoreColumn.ROW_GROUP_INDEX, scopes = {ROW_GROUP}) private final Integer rowGroupIndex;
+  @MetastoreFieldDefinition(column = MetastoreColumn.HOST_AFFINITY, scopes = {ROW_GROUP}) private final Map<String, Float> hostAffinity;
 
   private TableMetadataUnit(Builder builder) {
     this.storagePlugin = builder.storagePlugin;
@@ -437,19 +438,19 @@ public class TableMetadataUnit {
    */
   public static class Schema {
 
-    private final List<String> tableColumns;
-    private final List<String> segmentColumns;
-    private final List<String> fileColumns;
-    private final List<String> rowGroupColumns;
-    private final List<String> partitionColumns;
+    private final List<MetastoreColumn> tableColumns;
+    private final List<MetastoreColumn> segmentColumns;
+    private final List<MetastoreColumn> fileColumns;
+    private final List<MetastoreColumn> rowGroupColumns;
+    private final List<MetastoreColumn> partitionColumns;
     private final Map<String, MethodHandle> unitGetters;
     private final Map<String, MethodHandle> unitBuilderSetters;
 
-    private Schema(List<String> tableColumns,
-                   List<String> segmentColumns,
-                   List<String> fileColumns,
-                   List<String> rowGroupColumns,
-                   List<String> partitionColumns,
+    private Schema(List<MetastoreColumn> tableColumns,
+                   List<MetastoreColumn> segmentColumns,
+                   List<MetastoreColumn> fileColumns,
+                   List<MetastoreColumn> rowGroupColumns,
+                   List<MetastoreColumn> partitionColumns,
                    Map<String, MethodHandle> unitGetters,
                    Map<String, MethodHandle> unitBuilderSetters) {
       this.tableColumns = tableColumns;
@@ -469,11 +470,11 @@ public class TableMetadataUnit {
      * are the same as annotated fields names.
      */
     public static Schema of(Class<?> unitClass, Class<?> builderClass) {
-      List<String> tableColumns = new ArrayList<>();
-      List<String> segmentColumns = new ArrayList<>();
-      List<String> fileColumns = new ArrayList<>();
-      List<String> rowGroupColumns = new ArrayList<>();
-      List<String> partitionColumns = new ArrayList<>();
+      List<MetastoreColumn> tableColumns = new ArrayList<>();
+      List<MetastoreColumn> segmentColumns = new ArrayList<>();
+      List<MetastoreColumn> fileColumns = new ArrayList<>();
+      List<MetastoreColumn> rowGroupColumns = new ArrayList<>();
+      List<MetastoreColumn> partitionColumns = new ArrayList<>();
       Map<String, MethodHandle> unitGetters = new HashMap<>();
       Map<String, MethodHandle> unitBuilderSetters = new HashMap<>();
 
@@ -486,30 +487,30 @@ public class TableMetadataUnit {
           continue;
         }
 
-        String name = field.getName();
+        MetastoreColumn column = definition.column();
         for (MetadataType scope : definition.scopes()) {
           switch (scope) {
             case TABLE:
-              tableColumns.add(name);
+              tableColumns.add(column);
               break;
             case SEGMENT:
-              segmentColumns.add(name);
+              segmentColumns.add(column);
               break;
             case FILE:
-              fileColumns.add(name);
+              fileColumns.add(column);
               break;
             case ROW_GROUP:
-              rowGroupColumns.add(name);
+              rowGroupColumns.add(column);
               break;
             case PARTITION:
-              partitionColumns.add(name);
+              partitionColumns.add(column);
               break;
             case ALL:
-              tableColumns.add(name);
-              segmentColumns.add(name);
-              fileColumns.add(name);
-              rowGroupColumns.add(name);
-              partitionColumns.add(name);
+              tableColumns.add(column);
+              segmentColumns.add(column);
+              fileColumns.add(column);
+              rowGroupColumns.add(column);
+              partitionColumns.add(column);
               break;
             default:
               throw new IllegalStateException(scope.name());
@@ -518,10 +519,12 @@ public class TableMetadataUnit {
 
         Class<?> type = field.getType();
         try {
-          MethodHandle getter = gettersLookup.findVirtual(unitClass, name, MethodType.methodType(type));
-          unitGetters.put(name, getter);
-          MethodHandle setter = settersLookup.findVirtual(builderClass, name, MethodType.methodType(builderClass, type));
-          unitBuilderSetters.put(name, setter);
+          String fieldName = field.getName();
+          String columnName = column.columnName();
+          MethodHandle getter = gettersLookup.findVirtual(unitClass, fieldName, MethodType.methodType(type));
+          unitGetters.put(columnName, getter);
+          MethodHandle setter = settersLookup.findVirtual(builderClass, fieldName, MethodType.methodType(builderClass, type));
+          unitBuilderSetters.put(columnName, setter);
         } catch (ReflectiveOperationException e) {
           throw new MetastoreException(String.format("Unable to init unit setter / getter method handlers " +
               "for unit [%s] and its builder [%s] classes", unitClass.getSimpleName(), builderClass.getSimpleName()), e);
@@ -532,23 +535,23 @@ public class TableMetadataUnit {
         unitGetters, unitBuilderSetters);
     }
 
-    public List<String> tableColumns() {
+    public List<MetastoreColumn> tableColumns() {
       return tableColumns;
     }
 
-    public List<String> segmentColumns() {
+    public List<MetastoreColumn> segmentColumns() {
       return segmentColumns;
     }
 
-    public List<String> fileColumns() {
+    public List<MetastoreColumn> fileColumns() {
       return fileColumns;
     }
 
-    public List<String> rowGroupColumns() {
+    public List<MetastoreColumn> rowGroupColumns() {
       return rowGroupColumns;
     }
 
-    public List<String> partitionColumns() {
+    public List<MetastoreColumn> partitionColumns() {
       return partitionColumns;
     }
 
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreFieldDefinition.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TablesMetadataTypeValidator.java
similarity index 51%
copy from metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreFieldDefinition.java
copy to metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TablesMetadataTypeValidator.java
index c81b17c..d20467f 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreFieldDefinition.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TablesMetadataTypeValidator.java
@@ -15,29 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.metastore;
+package org.apache.drill.metastore.components.tables;
 
 import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.operate.MetadataTypeValidator;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import java.util.Arrays;
+import java.util.List;
 
 /**
- * Annotation used to determine to which metadata types Metastore units fields belong.
- * Mainly used when obtaining field information using reflection.
- * Only fields annotated with this annotation are considered to be part of component
- * metadata unit, non-annotated fields will be ignored.
+ * Implementation of {@link MetadataTypeValidator} interface which provides
+ * list of supported metadata types for Metastore Tables component.
  */
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.FIELD})
-public @interface MetastoreFieldDefinition {
+public class TablesMetadataTypeValidator implements MetadataTypeValidator {
 
-  /**
-   * Indicated metadata types to which field belongs to.
-   *
-   * @return array of metadata types field belongs to
-   */
-  MetadataType[] scopes();
+  public static final TablesMetadataTypeValidator INSTANCE = new TablesMetadataTypeValidator();
+
+  private static final List<MetadataType> SUPPORTED_METADATA_TYPES = Arrays.asList(
+    MetadataType.ALL,
+    MetadataType.TABLE,
+    MetadataType.SEGMENT,
+    MetadataType.FILE,
+    MetadataType.ROW_GROUP,
+    MetadataType.PARTITION);
+
+  @Override
+  public List<MetadataType> supportedMetadataTypes() {
+    return SUPPORTED_METADATA_TYPES;
+  }
 }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/FilterExpression.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/FilterExpression.java
index 784a892..3448022 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/FilterExpression.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/FilterExpression.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.metastore.expressions;
 
+import org.apache.drill.metastore.MetastoreColumn;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Stream;
@@ -77,54 +79,54 @@ public interface FilterExpression {
     }
   }
 
-  static <T> FilterExpression equal(String reference, T value) {
-    return new SimplePredicate.Equal<>(reference, value);
+  static <T> FilterExpression equal(MetastoreColumn column, T value) {
+    return new SimplePredicate.Equal<>(column, value);
   }
 
-  static <T> FilterExpression notEqual(String reference, T value) {
-    return new SimplePredicate.NotEqual<>(reference, value);
+  static <T> FilterExpression notEqual(MetastoreColumn column, T value) {
+    return new SimplePredicate.NotEqual<>(column, value);
   }
 
-  static <T> FilterExpression lessThan(String reference, T value) {
-    return new SimplePredicate.LessThan<>(reference, value);
+  static <T> FilterExpression lessThan(MetastoreColumn column, T value) {
+    return new SimplePredicate.LessThan<>(column, value);
   }
 
-  static <T> FilterExpression lessThanOrEqual(String reference, T value) {
-    return new SimplePredicate.LessThanOrEqual<>(reference, value);
+  static <T> FilterExpression lessThanOrEqual(MetastoreColumn column, T value) {
+    return new SimplePredicate.LessThanOrEqual<>(column, value);
   }
 
-  static <T> FilterExpression greaterThan(String reference, T value) {
-    return new SimplePredicate.GreaterThan<>(reference, value);
+  static <T> FilterExpression greaterThan(MetastoreColumn column, T value) {
+    return new SimplePredicate.GreaterThan<>(column, value);
   }
 
-  static <T> FilterExpression greaterThanOrEqual(String reference, T value) {
-    return new SimplePredicate.GreaterThanOrEqual<>(reference, value);
+  static <T> FilterExpression greaterThanOrEqual(MetastoreColumn column, T value) {
+    return new SimplePredicate.GreaterThanOrEqual<>(column, value);
   }
 
-  static <T> FilterExpression in(String reference, List<T> values) {
-    return new ListPredicate.In<>(reference, values);
+  static <T> FilterExpression in(MetastoreColumn column, List<T> values) {
+    return new ListPredicate.In<>(column, values);
   }
 
   @SafeVarargs
-  static <T> FilterExpression in(String reference, T... values) {
-    return in(reference, Arrays.asList(values));
+  static <T> FilterExpression in(MetastoreColumn column, T... values) {
+    return in(column, Arrays.asList(values));
   }
 
-  static <T> FilterExpression notIn(String reference, List<T> values) {
-    return new ListPredicate.NotIn<>(reference, values);
+  static <T> FilterExpression notIn(MetastoreColumn column, List<T> values) {
+    return new ListPredicate.NotIn<>(column, values);
   }
 
   @SafeVarargs
-  static <T> FilterExpression notIn(String reference, T... values) {
-    return notIn(reference, Arrays.asList(values));
+  static <T> FilterExpression notIn(MetastoreColumn column, T... values) {
+    return notIn(column, Arrays.asList(values));
   }
 
-  static FilterExpression isNull(String reference) {
-    return new IsPredicate.IsNull(reference);
+  static FilterExpression isNull(MetastoreColumn column) {
+    return new IsPredicate.IsNull(column);
   }
 
-  static FilterExpression isNotNull(String reference) {
-    return new IsPredicate.IsNotNull(reference);
+  static FilterExpression isNotNull(MetastoreColumn column) {
+    return new IsPredicate.IsNotNull(column);
   }
 
   static FilterExpression not(FilterExpression expression) {
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/IsPredicate.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/IsPredicate.java
index d3a26e2..68c2112 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/IsPredicate.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/IsPredicate.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.metastore.expressions;
 
+import org.apache.drill.metastore.MetastoreColumn;
+
 import java.util.StringJoiner;
 
 /**
@@ -24,16 +26,16 @@ import java.util.StringJoiner;
  */
 public abstract class IsPredicate implements FilterExpression {
 
-  private final String reference;
+  private final MetastoreColumn column;
   private final Operator operator;
 
-  protected IsPredicate(String reference, Operator operator) {
-    this.reference = reference;
+  protected IsPredicate(MetastoreColumn column, Operator operator) {
+    this.column = column;
     this.operator = operator;
   }
 
-  public String reference() {
-    return reference;
+  public MetastoreColumn column() {
+    return column;
   }
 
   @Override
@@ -44,7 +46,7 @@ public abstract class IsPredicate implements FilterExpression {
   @Override
   public String toString() {
     return new StringJoiner(", ", IsPredicate.class.getSimpleName() + "[", "]")
-      .add("reference=" + reference)
+      .add("column=" + column)
       .add("operator=" + operator)
       .toString();
   }
@@ -55,8 +57,8 @@ public abstract class IsPredicate implements FilterExpression {
    */
   public static class IsNull extends IsPredicate {
 
-    public IsNull(String reference) {
-      super(reference, Operator.IS_NULL);
+    public IsNull(MetastoreColumn column) {
+      super(column, Operator.IS_NULL);
     }
 
     @Override
@@ -71,8 +73,8 @@ public abstract class IsPredicate implements FilterExpression {
    */
   public static class IsNotNull extends IsPredicate {
 
-    public IsNotNull(String reference) {
-      super(reference, Operator.IS_NOT_NULL);
+    public IsNotNull(MetastoreColumn column) {
+      super(column, Operator.IS_NOT_NULL);
     }
 
     @Override
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/ListPredicate.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/ListPredicate.java
index 18a4433..05f0033 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/ListPredicate.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/ListPredicate.java
@@ -17,28 +17,30 @@
  */
 package org.apache.drill.metastore.expressions;
 
+import org.apache.drill.metastore.MetastoreColumn;
+
 import java.util.List;
 import java.util.StringJoiner;
 
 /**
- * Indicates list predicate implementations which have reference and list of values.
+ * Indicates list predicate implementations which have column and list of values.
  *
  * @param <T> predicate value type
  */
 public abstract class ListPredicate<T> implements FilterExpression {
 
-  private final String reference;
+  private final MetastoreColumn column;
   private final Operator operator;
   private final List<T> values;
 
-  protected ListPredicate(String reference, Operator operator, List<T> values) {
-    this.reference = reference;
+  protected ListPredicate(MetastoreColumn column, Operator operator, List<T> values) {
+    this.column = column;
     this.operator = operator;
     this.values = values;
   }
 
-  public String reference() {
-    return reference;
+  public MetastoreColumn column() {
+    return column;
   }
 
   public List<T> values() {
@@ -53,7 +55,7 @@ public abstract class ListPredicate<T> implements FilterExpression {
   @Override
   public String toString() {
     return new StringJoiner(", ", ListPredicate.class.getSimpleName() + "[", "]")
-      .add("reference=" + reference)
+      .add("column=" + column)
       .add("operator=" + operator)
       .add("values=" + values)
       .toString();
@@ -67,8 +69,8 @@ public abstract class ListPredicate<T> implements FilterExpression {
    */
   public static class In<T> extends ListPredicate<T> {
 
-    public In(String reference, List<T> values) {
-      super(reference, Operator.IN, values);
+    public In(MetastoreColumn column, List<T> values) {
+      super(column, Operator.IN, values);
     }
 
     @Override
@@ -85,8 +87,8 @@ public abstract class ListPredicate<T> implements FilterExpression {
    */
   public static class NotIn<T> extends ListPredicate<T> {
 
-    public NotIn(String reference, List<T> values) {
-      super(reference, Operator.NOT_IN, values);
+    public NotIn(MetastoreColumn column, List<T> values) {
+      super(column, Operator.NOT_IN, values);
     }
 
     @Override
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/SimplePredicate.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/SimplePredicate.java
index d18e7cf..a0da356 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/SimplePredicate.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/expressions/SimplePredicate.java
@@ -17,27 +17,29 @@
  */
 package org.apache.drill.metastore.expressions;
 
+import org.apache.drill.metastore.MetastoreColumn;
+
 import java.util.StringJoiner;
 
 /**
- * Indicates simple predicate implementations which have reference and one value.
+ * Indicates simple predicate implementations which have column and one value.
  *
  * @param <T> predicate value type
  */
 public abstract class SimplePredicate<T> implements FilterExpression {
 
-  private final String reference;
+  private final MetastoreColumn column;
   private final Operator operator;
   private final T value;
 
-  protected SimplePredicate(String reference, Operator operator, T value) {
-    this.reference = reference;
+  protected SimplePredicate(MetastoreColumn column, Operator operator, T value) {
+    this.column = column;
     this.operator = operator;
     this.value = value;
   }
 
-  public String reference() {
-    return reference;
+  public MetastoreColumn column() {
+    return column;
   }
 
   public T value() {
@@ -52,7 +54,7 @@ public abstract class SimplePredicate<T> implements FilterExpression {
   @Override
   public String toString() {
     return new StringJoiner(", ", SimplePredicate.class.getSimpleName() + "[", "]")
-      .add("reference=" + reference)
+      .add("column=" + column)
       .add("operator=" + operator)
       .add("value=" + value)
       .toString();
@@ -66,8 +68,8 @@ public abstract class SimplePredicate<T> implements FilterExpression {
    */
   public static class Equal<T> extends SimplePredicate<T> {
 
-    public Equal(String reference, T value) {
-      super(reference, Operator.EQUAL, value);
+    public Equal(MetastoreColumn column, T value) {
+      super(column, Operator.EQUAL, value);
     }
 
     @Override
@@ -84,8 +86,8 @@ public abstract class SimplePredicate<T> implements FilterExpression {
    */
   public static class NotEqual<T> extends SimplePredicate<T> {
 
-    public NotEqual(String reference, T value) {
-      super(reference, Operator.NOT_EQUAL, value);
+    public NotEqual(MetastoreColumn column, T value) {
+      super(column, Operator.NOT_EQUAL, value);
     }
 
     @Override
@@ -102,8 +104,8 @@ public abstract class SimplePredicate<T> implements FilterExpression {
    */
   public static class LessThan<T> extends SimplePredicate<T> {
 
-    public LessThan(String reference, T value) {
-      super(reference, Operator.LESS_THAN, value);
+    public LessThan(MetastoreColumn column, T value) {
+      super(column, Operator.LESS_THAN, value);
     }
 
     @Override
@@ -120,8 +122,8 @@ public abstract class SimplePredicate<T> implements FilterExpression {
    */
   public static class LessThanOrEqual<T> extends SimplePredicate<T> {
 
-    public LessThanOrEqual(String reference, T value) {
-      super(reference, Operator.LESS_THAN_OR_EQUAL, value);
+    public LessThanOrEqual(MetastoreColumn column, T value) {
+      super(column, Operator.LESS_THAN_OR_EQUAL, value);
     }
 
     @Override
@@ -138,8 +140,8 @@ public abstract class SimplePredicate<T> implements FilterExpression {
    */
   public static class GreaterThan<T> extends SimplePredicate<T> {
 
-    public GreaterThan(String reference, T value) {
-      super(reference, Operator.GREATER_THAN, value);
+    public GreaterThan(MetastoreColumn column, T value) {
+      super(column, Operator.GREATER_THAN, value);
     }
 
     @Override
@@ -156,8 +158,8 @@ public abstract class SimplePredicate<T> implements FilterExpression {
    */
   public static class GreaterThanOrEqual<T> extends SimplePredicate<T> {
 
-    public GreaterThanOrEqual(String reference, T value) {
-      super(reference, Operator.GREATER_THAN_OR_EQUAL, value);
+    public GreaterThanOrEqual(MetastoreColumn column, T value) {
+      super(column, Operator.GREATER_THAN_OR_EQUAL, value);
     }
 
     @Override
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java
index 1b41fea..038c599 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java
@@ -40,9 +40,6 @@ public class MetadataInfo {
   public static final String GENERAL_INFO_KEY = "GENERAL_INFO";
   public static final String DEFAULT_SEGMENT_KEY = "DEFAULT_SEGMENT";
   public static final String DEFAULT_COLUMN_PREFIX = "_$SEGMENT_";
-  public static final String METADATA_TYPE = "metadataType";
-  public static final String METADATA_KEY = "metadataKey";
-  public static final String METADATA_IDENTIFIER = "metadataIdentifier";
 
   private final MetadataType type;
   private final String key;
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableInfo.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableInfo.java
index f9fb3f4..3d90e04 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableInfo.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableInfo.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
 import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
 import org.apache.drill.metastore.expressions.FilterExpression;
 
@@ -44,10 +45,6 @@ public class TableInfo {
       .owner(UNKNOWN)
       .build();
 
-  public static final String STORAGE_PLUGIN = "storagePlugin";
-  public static final String WORKSPACE = "workspace";
-  public static final String TABLE_NAME = "tableName";
-
   private final String storagePlugin;
   private final String workspace;
   private final String name;
@@ -88,9 +85,9 @@ public class TableInfo {
   }
 
   public FilterExpression toFilter() {
-    FilterExpression storagePluginFilter = FilterExpression.equal(STORAGE_PLUGIN, storagePlugin);
-    FilterExpression workspaceFilter = FilterExpression.equal(WORKSPACE, workspace);
-    FilterExpression tableNameFilter = FilterExpression.equal(TABLE_NAME, name);
+    FilterExpression storagePluginFilter = FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, storagePlugin);
+    FilterExpression workspaceFilter = FilterExpression.equal(MetastoreColumn.WORKSPACE, workspace);
+    FilterExpression tableNameFilter = FilterExpression.equal(MetastoreColumn.TABLE_NAME, name);
     return FilterExpression.and(storagePluginFilter, workspaceFilter, tableNameFilter);
   }
 
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractModify.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractModify.java
new file mode 100644
index 0000000..35abdc9
--- /dev/null
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractModify.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.operate;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract implementation of {@link Modify<T>} interface which contains
+ * all boilerplace code for collecting overwrite units and delete operations.
+ * Delete operations metadata types are validated
+ * before adding to the list of pending delete operations.
+ *
+ * @param <T> Metastore metadata unit
+ */
+public abstract class AbstractModify<T> implements Modify<T> {
+
+  protected final List<T> overwriteUnits = new ArrayList<>();
+  protected final List<Delete> deletes = new ArrayList<>();
+
+  private final MetadataTypeValidator metadataTypeValidator;
+
+  protected AbstractModify(MetadataTypeValidator metadataTypeValidator) {
+    this.metadataTypeValidator = metadataTypeValidator;
+  }
+
+  @Override
+  public Modify<T> overwrite(List<T> units) {
+    overwriteUnits.addAll(units);
+    return this;
+  }
+
+  @Override
+  public Modify<T> delete(Delete delete) {
+    metadataTypeValidator.validate(delete.metadataTypes());
+    deletes.add(delete);
+    return this;
+  }
+}
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractRead.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractRead.java
new file mode 100644
index 0000000..7fd03d6
--- /dev/null
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractRead.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.operate;
+
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract implementation of {@link Read<T>} interface which contains
+ * all boilerplace code for collecting metadata types, columns and filter condition.
+ * Given metadata types are also validated during execution.
+ *
+ * @param <T> Metastore metadata unit
+ */
+public abstract class AbstractRead<T> implements Read<T> {
+
+  protected final List<MetadataType> metadataTypes = new ArrayList<>();
+  protected final List<MetastoreColumn> columns = new ArrayList<>();
+  protected FilterExpression filter;
+
+  private final MetadataTypeValidator metadataTypeValidator;
+
+  protected AbstractRead (MetadataTypeValidator metadataTypeValidator) {
+    this.metadataTypeValidator = metadataTypeValidator;
+  }
+
+  @Override
+  public Read<T> metadataTypes(List<MetadataType> metadataTypes) {
+    this.metadataTypes.addAll(metadataTypes);
+    return this;
+  }
+
+  @Override
+  public Read<T> filter(FilterExpression filter) {
+    this.filter = filter;
+    return this;
+  }
+
+  @Override
+  public Read<T> columns(List<MetastoreColumn> columns) {
+    this.columns.addAll(columns);
+    return this;
+  }
+
+  @Override
+  public final List<T> execute() {
+    metadataTypeValidator.validate(metadataTypes);
+    return internalExecute();
+  }
+
+  protected abstract List<T> internalExecute();
+}
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Delete.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Delete.java
new file mode 100644
index 0000000..5a53530
--- /dev/null
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Delete.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.operate;
+
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.StringJoiner;
+
+/**
+ * Delete operation holder, it includes filter by which Metastore data will be deleted
+ * and list of metadata types to which filter will be applied.
+ *
+ * Note: providing at list one metadata type is required.
+ * If delete operation should be applied to all metadata types,
+ * {@link MetadataType#ALL} can be indicated.
+ */
+public class Delete {
+
+  private final List<MetadataType> metadataTypes;
+  private final FilterExpression filter;
+
+  private Delete(Builder builder) {
+    this.metadataTypes = builder.metadataTypes;
+    this.filter = builder.filter;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public List<MetadataType> metadataTypes() {
+    return metadataTypes;
+  }
+
+  public FilterExpression filter() {
+    return filter;
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", Delete.class.getSimpleName() + "[", "]")
+      .add("metadataTypes=" + metadataTypes)
+      .add("filter=" + filter)
+      .toString();
+  }
+
+  public static class Builder {
+    private final List<MetadataType> metadataTypes = new ArrayList<>();
+    private FilterExpression filter;
+
+    public Builder metadataTypes(List<MetadataType> metadataTypes) {
+      this.metadataTypes.addAll(metadataTypes);
+      return this;
+    }
+
+    public Builder metadataType(MetadataType... metadataTypes) {
+      return metadataTypes(Arrays.asList(metadataTypes));
+    }
+
+    public Builder filter(FilterExpression filter) {
+      this.filter = filter;
+      return this;
+    }
+
+    public Delete build() {
+      return new Delete(this);
+    }
+  }
+}
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/MetadataTypeValidator.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/MetadataTypeValidator.java
new file mode 100644
index 0000000..96ba3dd
--- /dev/null
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/MetadataTypeValidator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.operate;
+
+import org.apache.drill.metastore.exceptions.MetastoreException;
+import org.apache.drill.metastore.metadata.MetadataType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Provides list of supported metadata types for concrete Metastore component unit
+ * and validates if given metadata types are supported.
+ */
+public interface MetadataTypeValidator {
+
+  /**
+   * @return list of supported metadata types for concrete Metastore component unit
+   */
+  List<MetadataType> supportedMetadataTypes();
+
+  /**
+   * Validates if given metadata types contain at least one metadata type
+   * and that all metadata types are supported.
+   *
+   * @param metadataTypes metadata types to be validated
+   * @throws MetastoreException if no metadata types were provided
+   *                            or given metadata types contain unsupported types
+   */
+  default void validate(List<MetadataType> metadataTypes) {
+    if (metadataTypes == null || metadataTypes.isEmpty()) {
+      throw new MetastoreException("Metadata type(s) must be indicated");
+    }
+
+    List<MetadataType> supportedMetadataTypes = supportedMetadataTypes();
+
+    List<MetadataType> unsupportedMetadataTypes = metadataTypes.stream()
+      .filter(metadataType -> !supportedMetadataTypes.contains(metadataType))
+      .collect(Collectors.toList());
+
+    if (!unsupportedMetadataTypes.isEmpty()) {
+      throw new MetastoreException("Unsupported metadata types are detected: " + unsupportedMetadataTypes);
+    }
+  }
+}
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Modify.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Modify.java
index 2b56662..dae8b4d 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Modify.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Modify.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.metastore.operate;
 
-import org.apache.drill.metastore.expressions.FilterExpression;
-
 import java.util.Arrays;
 import java.util.List;
 
@@ -31,7 +29,7 @@ import java.util.List;
 public interface Modify<T> {
 
   /**
-   * Adds overwrite operation for the Metastore component. For Metastore Tables compoenent,
+   * Adds overwrite operation for the Metastore component. For Metastore Tables component,
    * can be used to add new table data or replace partially / fully existing.
    * For example, if one of the table segments has changed,
    * all this segment data and table general information must be replaced with updated data.
@@ -48,27 +46,32 @@ public interface Modify<T> {
   }
 
   /**
-   * Adds delete operation for the Metastore component based on the given filter expression.
+   * Adds delete operation for the Metastore component based on the filter expression and metadata types.
    * For example for Metastore Tables component, if table has two segments
    * and data for one of the segments needs to be deleted.
    * Thus filter must be based on unique identifier of the table's top-level segment:
-   * storagePlugin = 'dfs' and workspace = 'tmp' and tableName = 'nation' and metadataKey = 'part_int=3'
+   * storagePlugin = 'dfs' and workspace = 'tmp' and tableName = 'nation' and metadataKey = 'part_int=3'.
+   * Metadata types should include all metadata types present in this segment:
+   * SEGMENT, FILE, ROW_GROUP, PARTITION.
+   * If all table metadata should be deleted, ALL segment can be indicated along with unique table identifier:
+   * storagePlugin = 'dfs' and workspace = 'tmp' and tableName = 'nation'.
    *
-   * @param filter filter expression
+   * @param delete delete operation holder
    * @return current instance of Modify interface implementation
    */
-  Modify<T> delete(FilterExpression filter);
-
-  /**
-   * Deletes all data from the Metastore component.
-   *
-   * @return current instance of Modify interface implementation
-   */
-  Modify<T> purge();
+  Modify<T> delete(Delete delete);
 
   /**
    * Executes list of provided metastore operations in one transaction if Metastore implementation
    * supports transactions, otherwise executes operations consecutively.
    */
   void execute();
+
+  /**
+   * Deletes all data from the Metastore component.
+   * Note, this is terminal operation and it does not take into account
+   * any previously set delete operations or overwrite units,
+   * it just deletes all data.
+   */
+  void purge();
 }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Read.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Read.java
index 92e5749..97bd79c 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Read.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Read.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.metastore.operate;
 
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
 
 import java.util.Arrays;
 import java.util.List;
@@ -31,6 +33,20 @@ import java.util.List;
 public interface Read<T> {
 
   /**
+   * Provides list of metadata types to be read.
+   * Note: providing at least one metadata type is required.
+   * If all metadata types should be read, {@link MetadataType#ALL} can be passed.
+   *
+   * @param metadataTypes list of metadata types
+   * @return current instance of Read interface implementation
+   */
+  Read<T> metadataTypes(List<MetadataType> metadataTypes);
+
+  default Read<T> metadataType(MetadataType... metadataType) {
+    return metadataTypes(Arrays.asList(metadataType));
+  }
+
+  /**
    * Provides filter expression by which metastore component data will be filtered.
    * If filter expression is not indicated, all Metastore component data will be read.
    *
@@ -48,15 +64,15 @@ public interface Read<T> {
    * @param columns list of columns to be read from Metastore component
    * @return current instance of Read interface implementation
    */
-  Read<T> columns(List<String> columns);
+  Read<T> columns(List<MetastoreColumn> columns);
 
-  default Read<T> columns(String... columns) {
+  default Read<T> columns(MetastoreColumn... columns) {
     return columns(Arrays.asList(columns));
   }
 
   /**
    * Executes read operation from Metastore component, returns obtained result in a form
-   * of list of component units  which later can be transformed into suitable format.
+   * of list of component units which later can be transformed into suitable format.
    *
    * @return list of component units
    */
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
index 8278816..1679cbe 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
@@ -18,8 +18,9 @@
 package org.apache.drill.metastore.components.tables;
 
 import org.apache.drill.categories.MetastoreTest;
+import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.expressions.FilterExpression;
-import org.apache.drill.metastore.metadata.MetadataInfo;
+import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.drill.test.BaseTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -47,7 +48,7 @@ public class TestBasicTablesRequests extends BaseTest {
 
   @Test
   public void testRequestMetadataWithRequestColumns() {
-    List<String> requestColumns = Arrays.asList("col1", "col2");
+    List<MetastoreColumn> requestColumns = Arrays.asList(MetastoreColumn.STORAGE_PLUGIN, MetastoreColumn.SCHEMA);
     BasicTablesRequests.RequestMetadata requestMetadata = BasicTablesRequests.RequestMetadata.builder()
       .column("col")
       .metadataKeys(Arrays.asList("a", "b", "c"))
@@ -80,7 +81,7 @@ public class TestBasicTablesRequests extends BaseTest {
       .column("col")
       .build();
 
-    FilterExpression expected = FilterExpression.equal(BasicTablesRequests.COLUMN, "col");
+    FilterExpression expected = FilterExpression.equal(MetastoreColumn.COLUMN, "col");
 
     assertEquals(expected.toString(), requestMetadata.filter().toString());
   }
@@ -93,8 +94,8 @@ public class TestBasicTablesRequests extends BaseTest {
       .build();
 
     FilterExpression expected = FilterExpression.and(
-      FilterExpression.equal(BasicTablesRequests.LOCATION, "/tmp/dir"),
-      FilterExpression.equal(BasicTablesRequests.COLUMN, "col"));
+      FilterExpression.equal(MetastoreColumn.LOCATION, "/tmp/dir"),
+      FilterExpression.equal(MetastoreColumn.COLUMN, "col"));
 
     assertEquals(expected.toString(), requestMetadata.filter().toString());
   }
@@ -110,8 +111,8 @@ public class TestBasicTablesRequests extends BaseTest {
       .build();
 
     FilterExpression expected = FilterExpression.and(
-      FilterExpression.in(BasicTablesRequests.LOCATION, locations),
-      FilterExpression.in(MetadataInfo.METADATA_KEY, metadataKeys));
+      FilterExpression.in(MetastoreColumn.LOCATION, locations),
+      FilterExpression.in(MetastoreColumn.METADATA_KEY, metadataKeys));
 
     assertEquals(expected.toString(), requestMetadata.filter().toString());
   }
@@ -120,7 +121,7 @@ public class TestBasicTablesRequests extends BaseTest {
   public void testRequestMetadataWithCustomFilter() {
     String column = "col";
     List<String> metadataKeys = Arrays.asList("a", "b", "c");
-    FilterExpression customFilter = FilterExpression.equal("custom", true);
+    FilterExpression customFilter = FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs");
 
     BasicTablesRequests.RequestMetadata requestMetadata = BasicTablesRequests.RequestMetadata.builder()
       .column(column)
@@ -129,10 +130,30 @@ public class TestBasicTablesRequests extends BaseTest {
       .build();
 
     FilterExpression expected = FilterExpression.and(
-      FilterExpression.equal(BasicTablesRequests.COLUMN, column),
-      FilterExpression.in(MetadataInfo.METADATA_KEY, metadataKeys),
+      FilterExpression.equal(MetastoreColumn.COLUMN, column),
+      FilterExpression.in(MetastoreColumn.METADATA_KEY, metadataKeys),
       customFilter);
 
     assertEquals(expected.toString(), requestMetadata.filter().toString());
   }
+
+  @Test
+  public void testRequestMetadataWithMetadataType() {
+    BasicTablesRequests.RequestMetadata requestMetadata = BasicTablesRequests.RequestMetadata.builder()
+      .metadataType(MetadataType.TABLE)
+      .build();
+
+    assertEquals(1, requestMetadata.metadataTypes().size());
+    assertEquals(MetadataType.TABLE, requestMetadata.metadataTypes().iterator().next());
+  }
+
+  @Test
+  public void testRequestMetadataWithMetadataTypes() {
+    BasicTablesRequests.RequestMetadata requestMetadata = BasicTablesRequests.RequestMetadata.builder()
+      .metadataTypes(MetadataType.TABLE, MetadataType.SEGMENT)
+      .metadataTypes(Arrays.asList(MetadataType.PARTITION, MetadataType.FILE))
+      .build();
+
+    assertEquals(4, requestMetadata.metadataTypes().size());
+  }
 }
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTablesMetadataTypeValidator.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTablesMetadataTypeValidator.java
new file mode 100644
index 0000000..80b1d61
--- /dev/null
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTablesMetadataTypeValidator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.components.tables;
+
+import org.apache.drill.categories.MetastoreTest;
+import org.apache.drill.metastore.exceptions.MetastoreException;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.test.BaseTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+@Category(MetastoreTest.class)
+public class TestTablesMetadataTypeValidator extends BaseTest {
+
+  @Test
+  public void testValidType() {
+    TablesMetadataTypeValidator.INSTANCE.validate(Collections.singletonList(MetadataType.ALL));
+    TablesMetadataTypeValidator.INSTANCE.validate(Collections.singletonList(MetadataType.TABLE));
+  }
+
+  @Test
+  public void testValidTypes() {
+    TablesMetadataTypeValidator.INSTANCE.validate(Arrays.asList(
+      MetadataType.TABLE,
+      MetadataType.SEGMENT,
+      MetadataType.FILE,
+      MetadataType.ROW_GROUP,
+      MetadataType.PARTITION));
+  }
+
+  @Test
+  public void testInvalidType() {
+    try {
+      TablesMetadataTypeValidator.INSTANCE.validate(Collections.singletonList(MetadataType.NONE));
+      fail();
+    } catch (MetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Unsupported metadata types are detected"));
+    }
+  }
+
+  @Test
+  public void testValidAndInvalidTypes() {
+    try {
+      TablesMetadataTypeValidator.INSTANCE.validate(Arrays.asList(
+        MetadataType.TABLE,
+        MetadataType.ALL,
+        MetadataType.NONE,
+        MetadataType.VIEW));
+      fail();
+    } catch (MetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Unsupported metadata types are detected"));
+    }
+  }
+}


[drill] 01/04: DRILL-6604: Upgrade Drill Hive client to Hive3.1 version

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e27ef55f4ca1392ff3c29b16f6ad37f0081f72a7
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Sun Mar 22 23:42:59 2020 +0200

    DRILL-6604: Upgrade Drill Hive client to Hive3.1 version
    
    closes #2038
---
 contrib/storage-hive/core/pom.xml                  | 30 +++++++++++++++--
 .../storage-hive/core/src/main/codegen/config.fmpp |  3 +-
 .../main/codegen/{config.fmpp => configHive3.fmpp} |  3 +-
 .../{config.fmpp => data/Hive2DateTypes.tdd}       | 24 ++++++++++---
 .../{config.fmpp => data/Hive3DateTypes.tdd}       | 24 ++++++++++---
 .../core/src/main/codegen/data/HiveTypes.tdd       | 14 --------
 .../codegen/templates/ObjectInspectorHelper.java   | 24 +++++++++----
 .../main/codegen/templates/ObjectInspectors.java   | 37 ++++++++++++++------
 .../exec/store/hive/HiveMetadataProvider.java      |  4 +--
 .../drill/exec/store/hive/HiveUtilities.java       | 39 ++++++++++++----------
 .../client/DrillHiveMetaStoreClientFactory.java    | 36 ++++++++++++++++++--
 .../hive/writers/primitive/HiveDateWriter.java     | 16 +++++----
 .../writers/primitive/HiveTimestampWriter.java     | 16 +++++----
 .../log4j/util/Strings.java}                       | 27 ++++++---------
 .../apache/drill/exec/hive/HiveTestFixture.java    |  2 ++
 .../apache/drill/exec/hive/HiveTestUtilities.java  | 31 ++++++++---------
 .../hive/BaseTestHiveImpersonation.java            | 28 +++++++++++++---
 .../exec/store/hive/HiveTestDataGenerator.java     |  6 ++--
 exec/rpc/pom.xml                                   | 10 ++----
 pom.xml                                            | 15 +++++++--
 20 files changed, 262 insertions(+), 127 deletions(-)

diff --git a/contrib/storage-hive/core/pom.xml b/contrib/storage-hive/core/pom.xml
index 50a782b..60603ee 100644
--- a/contrib/storage-hive/core/pom.xml
+++ b/contrib/storage-hive/core/pom.xml
@@ -30,6 +30,9 @@
   <artifactId>drill-storage-hive-core</artifactId>
   <packaging>jar</packaging>
   <name>contrib/hive-storage-plugin/core</name>
+  <properties>
+    <freemarker.conf.file>src/main/codegen/configHive3.fmpp</freemarker.conf.file>
+  </properties>
 
   <dependencies>
     <dependency>
@@ -61,6 +64,14 @@
           <groupId>commons-codec</groupId>
           <artifactId>commons-codec</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-1.2-api</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -146,7 +157,7 @@
     <dependency>
       <groupId>org.apache.hive.hcatalog</groupId>
       <artifactId>hive-hcatalog-core</artifactId>
-      <version>2.3.2</version>
+      <version>${hive.version}</version>
       <scope>test</scope>
       <exclusions>
         <exclusion>
@@ -189,6 +200,18 @@
           <groupId>org.apache.logging.log4j</groupId>
           <artifactId>log4j-web</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>servlet-api-2.5</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
@@ -208,7 +231,7 @@
               <goal>generate</goal>
             </goals>
             <configuration>
-              <config>src/main/codegen/config.fmpp</config>
+              <config>${freemarker.conf.file}</config>
               <output>${project.build.directory}/generated-sources</output>
               <templates>src/main/codegen/templates</templates>
             </configuration>
@@ -220,6 +243,9 @@
   <profiles>
     <profile>
       <id>mapr</id>
+      <properties>
+        <freemarker.conf.file>src/main/codegen/config.fmpp</freemarker.conf.file>
+      </properties>
       <build>
         <plugins>
           <plugin>
diff --git a/contrib/storage-hive/core/src/main/codegen/config.fmpp b/contrib/storage-hive/core/src/main/codegen/config.fmpp
index 731d67d..a460708 100644
--- a/contrib/storage-hive/core/src/main/codegen/config.fmpp
+++ b/contrib/storage-hive/core/src/main/codegen/config.fmpp
@@ -17,7 +17,8 @@
 #
 
 data: {
-    drillOI:tdd(../data/HiveTypes.tdd)
+    drillDataType:  tdd(../data/Hive2DateTypes.tdd),
+    drillOI:        tdd(../data/HiveTypes.tdd)
 }
 freemarkerLinks: {
     includes: includes/
diff --git a/contrib/storage-hive/core/src/main/codegen/config.fmpp b/contrib/storage-hive/core/src/main/codegen/configHive3.fmpp
similarity index 89%
copy from contrib/storage-hive/core/src/main/codegen/config.fmpp
copy to contrib/storage-hive/core/src/main/codegen/configHive3.fmpp
index 731d67d..cc36fc4 100644
--- a/contrib/storage-hive/core/src/main/codegen/config.fmpp
+++ b/contrib/storage-hive/core/src/main/codegen/configHive3.fmpp
@@ -17,7 +17,8 @@
 #
 
 data: {
-    drillOI:tdd(../data/HiveTypes.tdd)
+    drillDataType:  tdd(../data/Hive3DateTypes.tdd),
+    drillOI:        tdd(../data/HiveTypes.tdd)
 }
 freemarkerLinks: {
     includes: includes/
diff --git a/contrib/storage-hive/core/src/main/codegen/config.fmpp b/contrib/storage-hive/core/src/main/codegen/data/Hive2DateTypes.tdd
similarity index 61%
copy from contrib/storage-hive/core/src/main/codegen/config.fmpp
copy to contrib/storage-hive/core/src/main/codegen/data/Hive2DateTypes.tdd
index 731d67d..af42ee1 100644
--- a/contrib/storage-hive/core/src/main/codegen/config.fmpp
+++ b/contrib/storage-hive/core/src/main/codegen/data/Hive2DateTypes.tdd
@@ -16,9 +16,23 @@
 # limitations under the License.
 #
 
-data: {
-    drillOI:tdd(../data/HiveTypes.tdd)
-}
-freemarkerLinks: {
-    includes: includes/
+{
+  map: [
+    {
+      hiveType: "DATE",
+      hiveOI: "DateObjectInspector",
+      javaType: "java.sql.Date",
+      writableType: "org.apache.hadoop.hive.serde2.io.DateWritable",
+      drillType: "Date",
+      needOIForDrillType: true
+    },
+    {
+      hiveType: "TIMESTAMP",
+      hiveOI: "TimestampObjectInspector",
+      javaType: "java.sql.Timestamp",
+      writableType: "org.apache.hadoop.hive.serde2.io.TimestampWritable",
+      drillType: "TimeStamp",
+      needOIForDrillType: true
+    }
+  ]
 }
diff --git a/contrib/storage-hive/core/src/main/codegen/config.fmpp b/contrib/storage-hive/core/src/main/codegen/data/Hive3DateTypes.tdd
similarity index 58%
copy from contrib/storage-hive/core/src/main/codegen/config.fmpp
copy to contrib/storage-hive/core/src/main/codegen/data/Hive3DateTypes.tdd
index 731d67d..2c873e8 100644
--- a/contrib/storage-hive/core/src/main/codegen/config.fmpp
+++ b/contrib/storage-hive/core/src/main/codegen/data/Hive3DateTypes.tdd
@@ -16,9 +16,23 @@
 # limitations under the License.
 #
 
-data: {
-    drillOI:tdd(../data/HiveTypes.tdd)
-}
-freemarkerLinks: {
-    includes: includes/
+{
+  map: [
+    {
+      hiveType: "DATE",
+      hiveOI: "DateObjectInspector",
+      javaType: "org.apache.hadoop.hive.common.type.Date",
+      writableType: "org.apache.hadoop.hive.serde2.io.DateWritableV2",
+      drillType: "Date",
+      needOIForDrillType: true
+    },
+    {
+      hiveType: "TIMESTAMP",
+      hiveOI: "TimestampObjectInspector",
+      javaType: "org.apache.hadoop.hive.common.type.Timestamp",
+      writableType: "org.apache.hadoop.hive.serde2.io.TimestampWritableV2",
+      drillType: "TimeStamp",
+      needOIForDrillType: true
+    }
+  ]
 }
diff --git a/contrib/storage-hive/core/src/main/codegen/data/HiveTypes.tdd b/contrib/storage-hive/core/src/main/codegen/data/HiveTypes.tdd
index 3ea9dbb..0133dcd 100644
--- a/contrib/storage-hive/core/src/main/codegen/data/HiveTypes.tdd
+++ b/contrib/storage-hive/core/src/main/codegen/data/HiveTypes.tdd
@@ -96,25 +96,11 @@
       needOIForDrillType: true
     },
     {
-      hiveType: "TIMESTAMP",
-      hiveOI: "TimestampObjectInspector",
-      javaType: "java.sql.Timestamp",
-      drillType: "TimeStamp",
-      needOIForDrillType: true
-    },
-    {
       hiveType: "DECIMAL",
       hiveOI: "HiveDecimalObjectInspector",
       javaType: "org.apache.hadoop.hive.common.type.HiveDecimal",
       drillType: "VarDecimal",
       needOIForDrillType: true
-    },
-    {
-      hiveType: "DATE",
-      hiveOI: "DateObjectInspector",
-      javaType: "java.sql.Date",
-      drillType: "Date",
-      needOIForDrillType: true
     }
   ]
 }
diff --git a/contrib/storage-hive/core/src/main/codegen/templates/ObjectInspectorHelper.java b/contrib/storage-hive/core/src/main/codegen/templates/ObjectInspectorHelper.java
index 6b91c41..3e1ec4e 100644
--- a/contrib/storage-hive/core/src/main/codegen/templates/ObjectInspectorHelper.java
+++ b/contrib/storage-hive/core/src/main/codegen/templates/ObjectInspectorHelper.java
@@ -48,7 +48,8 @@ public class ObjectInspectorHelper {
   private static Multimap<MinorType, Class> OIMAP_REQUIRED = ArrayListMultimap.create();
   private static Multimap<MinorType, Class> OIMAP_OPTIONAL = ArrayListMultimap.create();
   static {
-<#list drillOI.map as entry>
+<#assign entries = drillDataType.map + drillOI.map />
+<#list entries as entry>
     <#if entry.needOIForDrillType == true>
     OIMAP_REQUIRED.put(MinorType.${entry.drillType?upper_case}, Drill${entry.drillType}${entry.hiveOI}.Required.class);
     OIMAP_OPTIONAL.put(MinorType.${entry.drillType?upper_case}, Drill${entry.drillType}${entry.hiveOI}.Optional.class);
@@ -91,7 +92,8 @@ public class ObjectInspectorHelper {
       case PRIMITIVE: {
         PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi;
         switch(poi.getPrimitiveCategory()) {
-<#list drillOI.map as entry>
+<#assign entries = drillDataType.map + drillOI.map />
+<#list entries as entry>
           case ${entry.hiveType}:{
             JType holderClass = TypeHelper.getHolderType(m, returnType, TypeProtos.DataMode.OPTIONAL);
             block.assign(returnValueHolder, JExpr._new(holderClass));
@@ -126,7 +128,8 @@ public class ObjectInspectorHelper {
 
   private static Map<PrimitiveCategory, MinorType> TYPE_HIVE2DRILL = new HashMap<>();
   static {
-<#list drillOI.map as entry>
+<#assign entries = drillDataType.map + drillOI.map />
+<#list entries as entry>
     TYPE_HIVE2DRILL.put(PrimitiveCategory.${entry.hiveType}, MinorType.${entry.drillType?upper_case});
 </#list>
   }
@@ -156,7 +159,8 @@ public class ObjectInspectorHelper {
       case PRIMITIVE: {
         PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi;
         switch(poi.getPrimitiveCategory()) {
-<#list drillOI.map as entry>
+<#assign entries = drillDataType.map + drillOI.map />
+<#list entries as entry>
           case ${entry.hiveType}:{
             JConditional jc = block._if(returnValue.eq(JExpr._null()));
             jc._then().assign(returnValueHolder.ref("isSet"), JExpr.lit(0));
@@ -201,17 +205,25 @@ public class ObjectInspectorHelper {
             jnullif._else().add(returnValueHolder.ref("buffer").invoke("setIndex").arg(JExpr.lit(0)).arg(data.ref("length")));
 
           <#elseif entry.hiveType == "TIMESTAMP">
-            JVar tsVar = jc._else().decl(m.directClass(java.sql.Timestamp.class.getCanonicalName()), "ts",
+            JVar tsVar = jc._else().decl(m.directClass(${entry.javaType}.class.getCanonicalName()), "ts",
               castedOI.invoke("getPrimitiveJavaObject").arg(returnValue));
+              <#if entry.javaType == "org.apache.hadoop.hive.common.type.Timestamp">
+            jc._else().assign(returnValueHolder.ref("value"), tsVar.invoke("toEpochMilli"));
+              <#else>
             // Bringing relative timestamp value without timezone info to timestamp value in UTC, since Drill keeps date-time values in UTC
             JVar localDateTimeVar = jc._else().decl(m.directClass(org.joda.time.LocalDateTime.class.getCanonicalName()), "localDateTime",
                 JExpr._new(m.directClass(org.joda.time.LocalDateTime.class.getCanonicalName())).arg(tsVar));
             jc._else().assign(returnValueHolder.ref("value"), localDateTimeVar.invoke("toDateTime")
                 .arg(m.directClass(org.joda.time.DateTimeZone.class.getCanonicalName()).staticRef("UTC")).invoke("getMillis"));
+              </#if>
           <#elseif entry.hiveType == "DATE">
-            JVar dVar = jc._else().decl(m.directClass(java.sql.Date.class.getCanonicalName()), "d",
+            JVar dVar = jc._else().decl(m.directClass(${entry.javaType}.class.getCanonicalName()), "d",
               castedOI.invoke("getPrimitiveJavaObject").arg(returnValue));
+              <#if entry.javaType == "org.apache.hadoop.hive.common.type.Date">
+            jc._else().assign(returnValueHolder.ref("value"), dVar.invoke("toEpochMilli"));
+              <#else>
             jc._else().assign(returnValueHolder.ref("value"), dVar.invoke("getTime"));
+              </#if>
           <#else>
             jc._else().assign(returnValueHolder.ref("value"),
               castedOI.invoke("get").arg(returnValue));
diff --git a/contrib/storage-hive/core/src/main/codegen/templates/ObjectInspectors.java b/contrib/storage-hive/core/src/main/codegen/templates/ObjectInspectors.java
index a539b7f..2dd6ce2 100644
--- a/contrib/storage-hive/core/src/main/codegen/templates/ObjectInspectors.java
+++ b/contrib/storage-hive/core/src/main/codegen/templates/ObjectInspectors.java
@@ -17,7 +17,8 @@
  */
 <@pp.dropOutputFile />
 
-<#list drillOI.map as entry>
+<#assign entries = drillDataType.map + drillOI.map />
+<#list entries as entry>
 <#if entry.needOIForDrillType == true>
 <@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/hive/Drill${entry.drillType}${entry.hiveOI}.java" />
 
@@ -32,12 +33,10 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.BooleanWritable;
@@ -204,7 +203,7 @@ public class Drill${entry.drillType}${entry.hiveOI} {
 
 <#elseif entry.drillType == "TimeStamp">
     @Override
-    public java.sql.Timestamp getPrimitiveJavaObject(Object o) {
+    public ${entry.javaType} getPrimitiveJavaObject(Object o) {
     <#if mode == "Optional">
       if (o == null) {
         return null;
@@ -213,14 +212,18 @@ public class Drill${entry.drillType}${entry.hiveOI} {
     <#else>
       final TimeStampHolder h = (TimeStampHolder) o;
     </#if>
+    <#if entry.javaType == "org.apache.hadoop.hive.common.type.Timestamp">
+      return ${entry.javaType}.ofEpochMilli(h.value);
+    <#else>
       org.joda.time.LocalDateTime dateTime = new org.joda.time.LocalDateTime(h.value, org.joda.time.DateTimeZone.UTC);
       // use "toDate()" to get java.util.Date object with exactly the same fields as this Joda date-time.
       // See more in Javadoc for "LocalDateTime#toDate()"
-      return new java.sql.Timestamp(dateTime.toDate().getTime());
+      return new ${entry.javaType}(dateTime.toDate().getTime());
+    </#if>
     }
 
     @Override
-    public TimestampWritable getPrimitiveWritableObject(Object o) {
+    public ${entry.writableType} getPrimitiveWritableObject(Object o) {
     <#if mode == "Optional">
       if (o == null) {
         return null;
@@ -229,15 +232,19 @@ public class Drill${entry.drillType}${entry.hiveOI} {
     <#else>
       final TimeStampHolder h = (TimeStampHolder) o;
     </#if>
+    <#if entry.javaType == "org.apache.hadoop.hive.common.type.Timestamp">
+      return new ${entry.writableType}(${entry.javaType}.ofEpochMilli(h.value));
+    <#else>
       org.joda.time.LocalDateTime dateTime = new org.joda.time.LocalDateTime(h.value, org.joda.time.DateTimeZone.UTC);
       // use "toDate()" to get java.util.Date object with exactly the same fields as this Joda date-time.
       // See more in Javadoc for "LocalDateTime#toDate()"
-      return new TimestampWritable(new java.sql.Timestamp(dateTime.toDate().getTime()));
+      return new ${entry.writableType}(new ${entry.javaType}(dateTime.toDate().getTime()));
+    </#if>
     }
 
 <#elseif entry.drillType == "Date">
     @Override
-    public java.sql.Date getPrimitiveJavaObject(Object o) {
+    public ${entry.javaType} getPrimitiveJavaObject(Object o) {
     <#if mode == "Optional">
       if (o == null) {
         return null;
@@ -246,14 +253,18 @@ public class Drill${entry.drillType}${entry.hiveOI} {
     <#else>
       final DateHolder h = (DateHolder) o;
     </#if>
+    <#if entry.javaType == "org.apache.hadoop.hive.common.type.Date">
+      return org.apache.hadoop.hive.common.type.Date.ofEpochMilli(h.value);
+    <#else>
       org.joda.time.LocalDate localDate = new org.joda.time.LocalDate(h.value, org.joda.time.DateTimeZone.UTC);
       // Use "toDate()" to get java.util.Date object with exactly the same year the same year, month and day as Joda date.
       // See more in Javadoc for "LocalDate#toDate()"
-      return new java.sql.Date(localDate.toDate().getTime());
+      return new ${entry.javaType}(localDate.toDate().getTime());
+    </#if>
     }
 
     @Override
-    public DateWritable getPrimitiveWritableObject(Object o) {
+    public ${entry.writableType} getPrimitiveWritableObject(Object o) {
     <#if mode == "Optional">
       if (o == null) {
         return null;
@@ -262,10 +273,14 @@ public class Drill${entry.drillType}${entry.hiveOI} {
     <#else>
       final DateHolder h = (DateHolder) o;
     </#if>
+    <#if entry.javaType == "org.apache.hadoop.hive.common.type.Date">
+      return new ${entry.writableType}(org.apache.hadoop.hive.common.type.Date.ofEpochMilli(h.value));
+    <#else>
       org.joda.time.LocalDate localDate = new org.joda.time.LocalDate(h.value, org.joda.time.DateTimeZone.UTC);
       // Use "toDate()" to get java.util.Date object with exactly the same year the same year, month and day as Joda date.
       // See more in Javadoc for "LocalDate#toDate()"
-      return new DateWritable(new java.sql.Date(localDate.toDate().getTime()));
+      return new ${entry.writableType}(new ${entry.javaType}(localDate.toDate().getTime()));
+    </#if>
     }
 
 <#else>
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index 4a2bb58..35dca62 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -31,9 +31,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
@@ -102,7 +102,7 @@ public class HiveMetadataProvider {
     HiveTableWithColumnCache table = hiveReadEntry.getTable();
     try {
       if (!isPartitionedTable) {
-        Properties properties = MetaStoreUtils.getTableMetadata(table);
+        Properties properties = new Table(table).getMetadata();
         HiveStats stats = HiveStats.getStatsFromProps(properties);
         if (stats.valid()) {
           return stats;
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index f7c7099..531284d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -67,7 +67,6 @@ import org.apache.drill.exec.work.ExecErrorConstants;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -75,6 +74,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -447,7 +447,7 @@ public class HiveUtilities {
       }
       final HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(job, storageHandlerClass);
       TableDesc tableDesc = new TableDesc();
-      tableDesc.setProperties(MetaStoreUtils.getTableMetadata(table));
+      tableDesc.setProperties(new org.apache.hadoop.hive.ql.metadata.Table(table).getMetadata());
       storageHandler.configureInputJobProperties(tableDesc, table.getParameters());
       return (Class<? extends InputFormat<?, ?>>) storageHandler.getInputFormatClass();
     } else {
@@ -468,7 +468,7 @@ public class HiveUtilities {
   }
 
   /**
-   * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(org.apache.hadoop.hive.metastore.api.Partition, Table)}
+   * Wrapper around {@code MetaStoreUtils#getPartitionMetadata(org.apache.hadoop.hive.metastore.api.Partition, Table)}
    * which also adds parameters from table to properties returned by that method.
    *
    * @param partition the source of partition level parameters
@@ -477,16 +477,20 @@ public class HiveUtilities {
    */
   public static Properties getPartitionMetadata(final HivePartition partition, final HiveTableWithColumnCache table) {
     restoreColumns(table, partition);
-    Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
-
-    // SerDe expects properties from Table, but above call doesn't add Table properties.
-    // Include Table properties in final list in order to not to break SerDes that depend on
-    // Table properties. For example AvroSerDe gets the schema from properties (passed as second argument)
-    table.getParameters().entrySet().stream()
-        .filter(e -> e.getKey() != null && e.getValue() != null)
-        .forEach(e -> properties.put(e.getKey(), e.getValue()));
-
-    return properties;
+    try {
+      Properties properties = new org.apache.hadoop.hive.ql.metadata.Partition(new org.apache.hadoop.hive.ql.metadata.Table(table), partition).getMetadataFromPartitionSchema();
+
+      // SerDe expects properties from Table, but above call doesn't add Table properties.
+      // Include Table properties in final list in order to not to break SerDes that depend on
+      // Table properties. For example AvroSerDe gets the schema from properties (passed as second argument)
+      table.getParameters().entrySet().stream()
+          .filter(e -> e.getKey() != null && e.getValue() != null)
+          .forEach(e -> properties.put(e.getKey(), e.getValue()));
+
+      return properties;
+    } catch (HiveException e) {
+      throw new DrillRuntimeException(e);
+    }
   }
 
   /**
@@ -507,17 +511,16 @@ public class HiveUtilities {
   }
 
   /**
-   * Wrapper around {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}
+   * Wrapper around {@code MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}
    * which also sets columns from table cache to table and returns properties returned by
-   * {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}.
+   * {@code MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}.
    *
    * @param table Hive table with cached columns
    * @return Hive table metadata
    */
   public static Properties getTableMetadata(HiveTableWithColumnCache table) {
     restoreColumns(table, null);
-    return MetaStoreUtils.getSchema(table.getSd(), table.getSd(), table.getParameters(),
-      table.getDbName(), table.getTableName(), table.getPartitionKeys());
+    return new org.apache.hadoop.hive.ql.metadata.Table(table).getMetadata();
   }
 
   /**
@@ -587,7 +590,7 @@ public class HiveUtilities {
   public static void verifyAndAddTransactionalProperties(JobConf job, StorageDescriptor sd) {
 
     if (AcidUtils.isTablePropertyTransactional(job)) {
-      AcidUtils.setTransactionalTableScan(job, true);
+      HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
 
       // No work is needed, if schema evolution is used
       if (Utilities.isSchemaEvolutionEnabled(job, true) && job.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS) != null &&
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/client/DrillHiveMetaStoreClientFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/client/DrillHiveMetaStoreClientFactory.java
index f392ba3..0ec9202 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/client/DrillHiveMetaStoreClientFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/client/DrillHiveMetaStoreClientFactory.java
@@ -24,8 +24,9 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 
 /**
  * Provides factory methods for initialization of {@link DrillHiveMetaStoreClient} instances.
@@ -65,7 +66,7 @@ public final class DrillHiveMetaStoreClientFactory {
           // delegation tokens).
           String delegationToken = processUserMetaStoreClient.getDelegationToken(userName, userName);
           try {
-            Utils.setTokenStr(ugiForRpc, delegationToken, DrillHiveMetaStoreClientWithAuthorization.DRILL2HMS_TOKEN);
+            setTokenStr(ugiForRpc, delegationToken, DrillHiveMetaStoreClientWithAuthorization.DRILL2HMS_TOKEN);
           } catch (IOException e) {
             throw new DrillRuntimeException("Couldn't setup delegation token in the UGI for Hive MetaStoreClient", e);
           }
@@ -89,6 +90,37 @@ public final class DrillHiveMetaStoreClientFactory {
   }
 
   /**
+   * Create a delegation token object for the given token string and service.
+   * Add the token to given UGI
+   *
+   * @param ugi          user group information
+   * @param tokenStr     token string
+   * @param tokenService token service
+   * @throws IOException if error happened during decoding token string
+   */
+  public static void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService)
+      throws IOException {
+    Token<?> delegationToken = createToken(tokenStr, tokenService);
+    ugi.addToken(delegationToken);
+  }
+
+  /**
+   * Create a new token using the given string and service
+   *
+   * @param tokenStr     token string
+   * @param tokenService token service
+   * @return {@link Token} instance with decoded string
+   * @throws IOException if error happened during decoding token string
+   */
+  private static Token<?> createToken(String tokenStr, String tokenService)
+      throws IOException {
+    Token<?> delegationToken = new Token<>();
+    delegationToken.decodeFromUrlString(tokenStr);
+    delegationToken.setService(new Text(tokenService));
+    return delegationToken;
+  }
+
+  /**
    * Create a DrillMetaStoreClient that can be shared across multiple users. This is created when impersonation is
    * disabled.
    *
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java
index a1b4822..6d1c3ed 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java
@@ -18,21 +18,25 @@
 package org.apache.drill.exec.store.hive.writers.primitive;
 
 import org.apache.drill.exec.vector.complex.writer.DateWriter;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
-public class HiveDateWriter extends AbstractSingleValueWriter<DateObjectInspector, DateWriter> {
+import java.sql.Date;
 
-  public HiveDateWriter(DateObjectInspector inspector, DateWriter writer) {
+public class HiveDateWriter extends AbstractSingleValueWriter<PrimitiveObjectInspector, DateWriter> {
+
+  public HiveDateWriter(PrimitiveObjectInspector inspector, DateWriter writer) {
     super(inspector, writer);
   }
 
   @Override
   public void write(Object value) {
-    final java.sql.Date dateValue = inspector.getPrimitiveJavaObject(value);
-    final DateTime date = new DateTime(dateValue.getTime()).withZoneRetainFields(DateTimeZone.UTC);
-    writer.writeDate(date.getMillis());
+    String dateString = PrimitiveObjectInspectorUtils.getString(value, inspector);
+    long dateMillis = new DateTime(Date.valueOf(dateString).getTime())
+        .withZoneRetainFields(DateTimeZone.UTC).getMillis();
+    writer.writeDate(dateMillis);
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveTimestampWriter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveTimestampWriter.java
index 9bc2b6a..72108c5 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveTimestampWriter.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveTimestampWriter.java
@@ -18,21 +18,25 @@
 package org.apache.drill.exec.store.hive.writers.primitive;
 
 import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
-public class HiveTimestampWriter extends AbstractSingleValueWriter<TimestampObjectInspector, TimeStampWriter> {
+import java.sql.Timestamp;
 
-  public HiveTimestampWriter(TimestampObjectInspector inspector, TimeStampWriter writer) {
+public class HiveTimestampWriter extends AbstractSingleValueWriter<PrimitiveObjectInspector, TimeStampWriter> {
+
+  public HiveTimestampWriter(PrimitiveObjectInspector inspector, TimeStampWriter writer) {
     super(inspector, writer);
   }
 
   @Override
   public void write(Object value) {
-    final java.sql.Timestamp timestampValue = inspector.getPrimitiveJavaObject(value);
-    final DateTime ts = new DateTime(timestampValue.getTime()).withZoneRetainFields(DateTimeZone.UTC);
-    writer.writeTimeStamp(ts.getMillis());
+    String timestampString = PrimitiveObjectInspectorUtils.getString(value, inspector);
+    long timestampMillis = new DateTime(Timestamp.valueOf(timestampString).getTime())
+        .withZoneRetainFields(DateTimeZone.UTC).getMillis();
+    writer.writeTimeStamp(timestampMillis);
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java b/contrib/storage-hive/core/src/main/java/org/apache/logging/log4j/util/Strings.java
similarity index 52%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java
copy to contrib/storage-hive/core/src/main/java/org/apache/logging/log4j/util/Strings.java
index a1b4822..7ec47c5 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/logging/log4j/util/Strings.java
@@ -15,24 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.writers.primitive;
+package org.apache.logging.log4j.util;
 
-import org.apache.drill.exec.vector.complex.writer.DateWriter;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
+import org.apache.commons.lang3.StringUtils;
 
-public class HiveDateWriter extends AbstractSingleValueWriter<DateObjectInspector, DateWriter> {
-
-  public HiveDateWriter(DateObjectInspector inspector, DateWriter writer) {
-    super(inspector, writer);
-  }
+/**
+ * Hive uses class with the same full name from log4j-1.2-api.
+ * Added this class to avoid ClassNotFound errors from Hive.
+ *
+ * See <a href="https://issues.apache.org/jira/browse/HIVE-23088">HIVE-23088</a> for the problem description.
+ */
+public class Strings {
 
-  @Override
-  public void write(Object value) {
-    final java.sql.Date dateValue = inspector.getPrimitiveJavaObject(value);
-    final DateTime date = new DateTime(dateValue.getTime()).withZoneRetainFields(DateTimeZone.UTC);
-    writer.writeDate(date.getMillis());
+  public static boolean isBlank(final String s) {
+    return StringUtils.isBlank(s);
   }
-
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
index ad8c31c..0bf5d42 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
@@ -167,6 +167,8 @@ public class HiveTestFixture {
       driverOption(ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
       driverOption(ConfVars.METASTORE_AUTO_CREATE_ALL, Boolean.toString(true));
       driverOption(ConfVars.METASTORE_SCHEMA_VERIFICATION, Boolean.toString(false));
+      driverOption(ConfVars.HIVE_MATERIALIZED_VIEW_ENABLE_AUTO_REWRITING, Boolean.toString(false));
+      driverOption(HiveConf.ConfVars.HIVESESSIONSILENT, Boolean.toString(true));
       driverOption(ConfVars.HIVE_CBO_ENABLED, Boolean.toString(false));
     }
 
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
index 8518fca..537d0f4 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
@@ -27,9 +27,10 @@ import java.util.Set;
 
 import org.apache.drill.test.QueryBuilder;
 import org.apache.drill.test.TestTools;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.util.ComparableVersion;
+import org.apache.hive.common.util.HiveVersionInfo;
 import org.junit.AssumptionViolatedException;
 
 import static org.hamcrest.CoreMatchers.containsString;
@@ -46,27 +47,19 @@ public class HiveTestUtilities {
   private static final Set<PosixFilePermission> ALL_POSIX_PERMISSIONS = EnumSet.allOf(PosixFilePermission.class);
 
   /**
-   * Execute the give <i>query</i> on given <i>hiveDriver</i> instance. If a {@link CommandNeedRetryException}
-   * exception is thrown, it tries upto 3 times before returning failure.
-   * @param hiveDriver
-   * @param query
+   * Execute the give <i>query</i> on given <i>hiveDriver</i> instance.
    */
   public static void executeQuery(Driver hiveDriver, String query) {
-    CommandProcessorResponse response = null;
-    boolean failed = false;
-    int retryCount = 3;
-
+    CommandProcessorResponse response;
     try {
       response = hiveDriver.run(query);
-    } catch(CommandNeedRetryException ex) {
-      if (--retryCount == 0) {
-        failed = true;
-      }
+    } catch (Exception e) {
+       throw new RuntimeException(e);
     }
 
-    if (failed || response.getResponseCode() != 0 ) {
+    if (response.getResponseCode() != 0 ) {
       throw new RuntimeException(String.format("Failed to execute command '%s', errorMsg = '%s'",
-          query, (response != null ? response.getErrorMessage() : "")));
+          query, response.getErrorMessage()));
     }
   }
 
@@ -142,6 +135,14 @@ public class HiveTestUtilities {
   }
 
   /**
+   * Checks whether current version is not less than hive 3.0
+   */
+  public static boolean isHive3() {
+    return new ComparableVersion(HiveVersionInfo.getVersion())
+        .compareTo(new ComparableVersion("3.0")) >= 0;
+  }
+
+  /**
    * Checks if current version is supported by Hive.
    *
    * @throws AssumptionViolatedException if current version is not supported by Hive,
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
index 4fc85cc..33e77ea 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
@@ -27,14 +27,13 @@ import org.apache.drill.exec.hive.HiveTestUtilities;
 import org.apache.drill.exec.impersonation.BaseTestImpersonation;
 import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
 import org.apache.drill.test.TestBuilder;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.junit.BeforeClass;
 
 import static org.apache.drill.exec.hive.HiveTestUtilities.createDirWithPosixPermissions;
@@ -85,6 +84,9 @@ public class BaseTestHiveImpersonation extends BaseTestImpersonation {
     hiveConf.set(ConfVars.METASTORE_SCHEMA_VERIFICATION.varname, "false");
     hiveConf.set(ConfVars.METASTORE_AUTO_CREATE_ALL.varname, "true");
     hiveConf.set(ConfVars.HIVE_CBO_ENABLED.varname, "false");
+    hiveConf.set(ConfVars.HIVESTATSAUTOGATHER.varname, "false");
+    hiveConf.set(ConfVars.HIVESTATSCOLAUTOGATHER.varname, "false");
+    hiveConf.set(ConfVars.HIVESESSIONSILENT.varname, "true");
 
     // Set MiniDFS conf in HiveConf
     hiveConf.set(FS_DEFAULT_NAME_KEY, dfsConf.get(FS_DEFAULT_NAME_KEY));
@@ -97,11 +99,29 @@ public class BaseTestHiveImpersonation extends BaseTestImpersonation {
   }
 
   protected static void startHiveMetaStore() throws Exception {
-    final int port = MetaStoreUtils.findFreePort();
+    Class<?> metaStoreUtilsClass;
+    Class<?> hadoopThriftAuthBridgeClass;
+    Class<?> confClass;
+    Object hadoopThriftAuthBridge;
+    // TODO: remove reflection stuff when all supported profiles will be switched to Hive 3+ version
+    try {
+      metaStoreUtilsClass = Class.forName("org.apache.hadoop.hive.metastore.utils.MetaStoreUtils");
+      hadoopThriftAuthBridgeClass = Class.forName("org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge");
+      hadoopThriftAuthBridge = hadoopThriftAuthBridgeClass.getDeclaredMethod("getBridge").invoke(null);
+      confClass = Configuration.class;
+    } catch (ClassNotFoundException e) {
+      metaStoreUtilsClass = Class.forName("org.apache.hadoop.hive.metastore.MetaStoreUtils");
+      hadoopThriftAuthBridgeClass = Class.forName("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge");
+      hadoopThriftAuthBridge = Class.forName("org.apache.hadoop.hive.shims.ShimLoader")
+          .getDeclaredMethod("getHadoopThriftAuthBridge").invoke(null);
+      confClass = HiveConf.class;
+    }
+    final int port = (int) metaStoreUtilsClass.getDeclaredMethod("findFreePort").invoke(null);
 
     hiveConf.set(METASTOREURIS.varname, "thrift://localhost:" + port);
 
-    MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf);
+    metaStoreUtilsClass.getDeclaredMethod("startMetaStore", int.class, hadoopThriftAuthBridgeClass, confClass)
+        .invoke(null, port, hadoopThriftAuthBridge, hiveConf);
   }
 
   protected static HiveStoragePluginConfig createHiveStoragePlugin(final Map<String, String> hiveConfig) throws Exception {
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index d5a3f72..376b49d 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -25,6 +25,7 @@ import java.sql.Timestamp;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.hive.HiveTestUtilities;
 import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.BaseTestQuery;
@@ -442,7 +443,8 @@ public class HiveTestDataGenerator {
     executeQuery(hiveDriver, "CREATE OR REPLACE VIEW readtest_view AS SELECT * FROM readtest");
     executeQuery(hiveDriver, "CREATE VIEW IF NOT EXISTS hive_view AS SELECT * FROM kv");
     executeQuery(hiveDriver, "CREATE OR REPLACE VIEW kv_native_view AS SELECT * FROM kv_native");
-    executeQuery(hiveDriver, "CREATE MATERIALIZED VIEW IF NOT EXISTS hive_view_m AS SELECT * FROM kv WHERE key = 1");
+    String disableRewrite = HiveTestUtilities.isHive3() ? "DISABLE REWRITE" : "";
+    executeQuery(hiveDriver, String.format("CREATE MATERIALIZED VIEW IF NOT EXISTS hive_view_m %s AS SELECT * FROM kv WHERE key = 1", disableRewrite));
     executeQuery(hiveDriver, "CREATE OR REPLACE VIEW view_over_hive_view AS SELECT * FROM hive_view WHERE key BETWEEN 2 AND 3");
     executeQuery(hiveDriver, "CREATE OR REPLACE VIEW db1.two_table_view AS SELECT COUNT(dk.key) dk_key_count FROM db1.avro dk " +
         "INNER JOIN kv ON kv.key = dk.key");
@@ -592,7 +594,7 @@ public class HiveTestDataGenerator {
 
   private String generateTestDataWithHeadersAndFooters(String tableName, int rowCount, int headerLines, int footerLines) {
     StringBuilder sb = new StringBuilder();
-    sb.append("insert into table ").append(tableName).append(" (key, value) values ");
+    sb.append("insert into table ").append(tableName).append(" values ");
     sb.append(StringUtils.repeat("('key_header', 'value_header')", ",", headerLines));
     if (headerLines > 0) {
       sb.append(",");
diff --git a/exec/rpc/pom.xml b/exec/rpc/pom.xml
index dead5a4..81caa5b 100644
--- a/exec/rpc/pom.xml
+++ b/exec/rpc/pom.xml
@@ -45,7 +45,7 @@
       <artifactId>drill-memory-base</artifactId>
       <version>${project.version}</version>
     </dependency>
-    
+
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
@@ -54,7 +54,7 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-transport-native-epoll</artifactId>
       <classifier>linux-x86_64</classifier>
-      <version>4.0.48.Final</version>
+      <version>${netty.version}</version>
       <exclusions>
         <exclusion>
           <groupId>io.netty</groupId>
@@ -84,10 +84,4 @@
     </dependency>
   </dependencies>
 
-
-  <build>
-  </build>
-
-
-
 </project>
diff --git a/pom.xml b/pom.xml
index bacd6bd..7f46c3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@
       Currently Hive storage plugin only supports Apache Hive 2.3.2 or vendor specific variants of the
       Apache Hive 2.3.2. If the version is changed, make sure the jars and their dependencies are updated.
     -->
-    <hive.version>2.3.2</hive.version>
+    <hive.version>3.1.2</hive.version>
     <hadoop.version>3.2.1</hadoop.version>
     <hbase.version>2.2.2</hbase.version>
     <fmpp.version>1.0</fmpp.version>
@@ -112,6 +112,7 @@
     <surefire.version>3.0.0-M4</surefire.version>
     <commons.compress.version>1.19</commons.compress.version>
     <hikari.version>3.4.2</hikari.version>
+    <netty.version>4.0.48.Final</netty.version>
   </properties>
 
   <scm>
@@ -1002,13 +1003,11 @@
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-handler</artifactId>
-      <version>4.0.48.Final</version>
     </dependency>
 
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-common</artifactId>
-      <version>4.0.48.Final</version>
     </dependency>
 
     <dependency>
@@ -1692,6 +1691,16 @@
         <optional>true</optional>
       </dependency>
       <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-handler</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-common</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+      <dependency>
         <groupId>com.tdunning</groupId>
         <artifactId>json</artifactId>
         <version>1.8</version>


[drill] 04/04: DRILL-7673: View set query fails with NPE for non-existing option

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 6d98c127548ef13ab0c2ec61cc59299ee00a2232
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Sat Mar 28 00:49:50 2020 +0200

    DRILL-7673: View set query fails with NPE for non-existing option
    
    closes #2043
---
 .../planner/sql/handlers/SetOptionHandler.java     |  6 ++++-
 .../planner/sql/handlers/SetOptionHandlerTest.java | 27 +++++++++++++++++-----
 2 files changed, 26 insertions(+), 7 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
index 41a1b78..d041444 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlSetOption;
+import org.apache.drill.exec.server.options.OptionDefinition;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.OptionValue.OptionScope;
@@ -63,9 +64,12 @@ public class SetOptionHandler extends AbstractSqlSetHandler {
     SqlNode optionValue = statement.getValue();
 
     if (optionValue == null) {
+      // OptionManager.getOptionDefinition() call ensures that the specified option name is valid
+      OptionDefinition optionDefinition = optionManager.getOptionDefinition(optionName);
       String value = String.valueOf(optionManager.getOption(optionName).getValue());
 
-      return DirectPlan.createDirectPlan(context, new SetOptionViewResult(optionName, value));
+      // obtains option name from OptionDefinition to use the name as defined in the option, rather than what the user provided
+      return DirectPlan.createDirectPlan(context, new SetOptionViewResult(optionDefinition.getValidator().getOptionName(), value));
     } else {
       if (optionScope == OptionValue.OptionScope.SYSTEM) {
         checkAdminPrivileges(context.getOptions());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandlerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandlerTest.java
index def1240..3f2854d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandlerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandlerTest.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner.sql.handlers;
 
 import org.apache.drill.categories.SqlTest;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.ClassCompilerSelector;
 import org.apache.drill.test.ClusterFixture;
@@ -27,6 +28,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
 @Category(SqlTest.class)
 public class SetOptionHandlerTest extends ClusterTest {
 
@@ -42,11 +47,11 @@ public class SetOptionHandlerTest extends ClusterTest {
             ClassCompilerSelector.JAVA_COMPILER_DEBUG_OPTION)
         .singletonString();
 
-    boolean newValue = !Boolean.valueOf(defaultValue);
+    boolean newValue = !Boolean.parseBoolean(defaultValue);
     try {
       client.alterSession(ClassCompilerSelector.JAVA_COMPILER_DEBUG_OPTION, newValue);
 
-      String changedValue = client.queryBuilder()
+      String changedValue = queryBuilder()
           .sql("SELECT val from sys.options where name = '%s' limit 1",
               ClassCompilerSelector.JAVA_COMPILER_DEBUG_OPTION)
           .singletonString();
@@ -59,32 +64,42 @@ public class SetOptionHandlerTest extends ClusterTest {
 
   @Test
   public void testViewSetQuery() throws Exception {
-    client.testBuilder()  // BIT
+    testBuilder()  // BIT
         .sqlQuery("SET `%s`", ExecConstants.ENABLE_ITERATOR_VALIDATION_OPTION)
         .unOrdered()
         .sqlBaselineQuery("SELECT name, val as value FROM sys.options where name = '%s' limit 1",
             ExecConstants.ENABLE_ITERATOR_VALIDATION_OPTION)
         .go();
 
-    client.testBuilder()  // BIGINT
+    testBuilder()  // BIGINT
         .sqlQuery("SET `%s`", ExecConstants.OUTPUT_BATCH_SIZE)
         .unOrdered()
         .sqlBaselineQuery("SELECT name, val as value FROM sys.options where name = '%s' limit 1",
             ExecConstants.OUTPUT_BATCH_SIZE)
         .go();
 
-    client.testBuilder()  // FLOAT
+    testBuilder()  // FLOAT
         .sqlQuery("SET `%s`", ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR)
         .unOrdered()
         .sqlBaselineQuery("SELECT name, val as value FROM sys.options where name = '%s' limit 1",
             ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR)
         .go();
 
-    client.testBuilder()  // VARCHAR
+    testBuilder()  // VARCHAR
         .sqlQuery("SET `%s`", ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL)
         .unOrdered()
         .sqlBaselineQuery("SELECT name, val as value FROM sys.options where name = '%s' limit 1",
             ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL)
         .go();
   }
+
+  @Test
+  public void testViewSetWithIncorrectOption() throws Exception {
+    try {
+      run("set `non-existing option`");
+      fail();
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), startsWith("VALIDATION ERROR"));
+    }
+  }
 }