You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2020/02/04 11:53:03 UTC

[drill] branch master updated: DRILL-7544: Upgrade Iceberg version to support Parquet 1.11.0

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f7c58f  DRILL-7544: Upgrade Iceberg version to support Parquet 1.11.0
9f7c58f is described below

commit 9f7c58fc5a47cfe67e5fb5f5f35971cf534133d5
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Wed Jan 22 15:55:09 2020 +0200

    DRILL-7544: Upgrade Iceberg version to support Parquet 1.11.0
    
    1. Upgraded Iceberg to the commit that supports Parquet 1.11.0.
    2. Removed workaround in ExpirationHandler and used built-in logic from Iceberg library.
    3. Updated description about expiration logic in README.md.
---
 metastore/iceberg-metastore/README.md              |  10 +-
 metastore/iceberg-metastore/pom.xml                |  12 +-
 .../drill/metastore/iceberg/IcebergMetastore.java  |   6 +-
 .../iceberg/components/tables/IcebergTables.java   |   4 +-
 .../iceberg/config/IcebergConfigConstants.java     |   5 -
 .../iceberg/operate/ExpirationHandler.java         | 263 +++------------------
 .../metastore/iceberg/operate/IcebergModify.java   |   5 +-
 .../src/main/resources/drill-metastore-module.conf |  14 +-
 .../iceberg/operate/TestExpirationHandler.java     | 149 +++++-------
 9 files changed, 119 insertions(+), 349 deletions(-)

diff --git a/metastore/iceberg-metastore/README.md b/metastore/iceberg-metastore/README.md
index 3db08d1..094154b 100644
--- a/metastore/iceberg-metastore/README.md
+++ b/metastore/iceberg-metastore/README.md
@@ -183,8 +183,8 @@ expiration process is launched.
 Iceberg table generates metadata for each modification operation:
 snapshot, manifest file, table metadata file. Also when performing delete operation,
 previously stored data files are not deleted. These files with the time
-can occupy lots of space. `ExpirationHandler` allows to expire outdated metadata and
-data files after configured time period (`drill.metastore.iceberg.expiration.period`).
-If expiration period is not indicated, zero or negative, expiration won't be performed.
-`ExpirationHandler` is called after each modification operation, it checks if expiration period
-has elapsed and submits expiration process in a separate thread.
+can occupy lots of space. Two table properties `write.metadata.delete-after-commit.enabled`
+and `write.metadata.previous-versions-max` control expiration process.
+Metadata files will be expired automatically if `write.metadata.delete-after-commit.enabled` 
+is enabled. Snapshots and data files will be expired using `ExpirationHandler` 
+after each commit operation based on the same table properties values.
diff --git a/metastore/iceberg-metastore/pom.xml b/metastore/iceberg-metastore/pom.xml
index 818ea08..aa68271 100644
--- a/metastore/iceberg-metastore/pom.xml
+++ b/metastore/iceberg-metastore/pom.xml
@@ -31,7 +31,7 @@
   <name>metastore/Drill Iceberg Metastore</name>
 
   <properties>
-    <iceberg.version>0.7.0-incubating</iceberg.version>
+    <iceberg.version>2d75130</iceberg.version>
     <caffeine.version>2.7.0</caffeine.version>
   </properties>
 
@@ -47,27 +47,27 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.iceberg</groupId>
+      <groupId>com.github.apache.incubator-iceberg</groupId>
       <artifactId>iceberg-parquet</artifactId>
       <version>${iceberg.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.iceberg</groupId>
+      <groupId>com.github.apache.incubator-iceberg</groupId>
       <artifactId>iceberg-data</artifactId>
       <version>${iceberg.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.iceberg</groupId>
+      <groupId>com.github.apache.incubator-iceberg</groupId>
       <artifactId>iceberg-core</artifactId>
       <version>${iceberg.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.iceberg</groupId>
+      <groupId>com.github.apache.incubator-iceberg</groupId>
       <artifactId>iceberg-common</artifactId>
       <version>${iceberg.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.iceberg</groupId>
+      <groupId>com.github.apache.incubator-iceberg</groupId>
       <artifactId>iceberg-api</artifactId>
       <version>${iceberg.version}</version>
     </dependency>
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
index e223c5a..94bc6bf 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
@@ -25,7 +25,6 @@ import org.apache.drill.metastore.components.views.Views;
 import org.apache.drill.metastore.iceberg.components.tables.IcebergTables;
 import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
 import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
-import org.apache.drill.metastore.iceberg.operate.ExpirationHandler;
 import org.apache.drill.metastore.iceberg.schema.IcebergTableSchema;
 import org.apache.drill.shaded.guava.com.google.common.collect.MapDifference;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -59,7 +58,6 @@ public class IcebergMetastore implements Metastore {
   private final org.apache.iceberg.Tables tables;
   private final String baseLocation;
   private final Map<String, String> commonProperties;
-  private final ExpirationHandler expirationHandler;
 
   /**
    * Table properties for each Iceberg table should be updated only once,
@@ -77,7 +75,6 @@ public class IcebergMetastore implements Metastore {
     this.tables = new HadoopTables(new Configuration(configuration));
     this.baseLocation = baseLocation(new Configuration(configuration));
     this.commonProperties = properties(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES);
-    this.expirationHandler = new ExpirationHandler(config, new Configuration(configuration));
   }
 
   @Override
@@ -85,7 +82,7 @@ public class IcebergMetastore implements Metastore {
     Table table = loadTable(IcebergConfigConstants.COMPONENTS_TABLES_LOCATION,
       IcebergConfigConstants.COMPONENTS_TABLES_PROPERTIES,
       IcebergTables.SCHEMA, Tables.class);
-    return new IcebergTables(table, expirationHandler);
+    return new IcebergTables(table);
   }
 
   @Override
@@ -273,6 +270,5 @@ public class IcebergMetastore implements Metastore {
 
   @Override
   public void close() {
-    expirationHandler.close();
   }
 }
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
index f4d32e9..a28248d 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
@@ -58,9 +58,9 @@ public class IcebergTables implements Tables, MetastoreContext<TableMetadataUnit
   private final Table table;
   private final ExpirationHandler expirationHandler;
 
-  public IcebergTables(Table table, ExpirationHandler expirationHandler) {
+  public IcebergTables(Table table) {
     this.table = table;
-    this.expirationHandler = expirationHandler;
+    this.expirationHandler = new ExpirationHandler(table);
   }
 
   public MetastoreContext<TableMetadataUnit> context() {
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
index 4419f68..6f326ac 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
@@ -52,11 +52,6 @@ public interface IcebergConfigConstants {
   String RELATIVE_PATH = LOCATION_NAMESPACE + "relative_path";
 
   /**
-   * Defines config which provides expiration period value.
-   */
-  String EXPIRATION_PERIOD = BASE + "expiration.period";
-
-  /**
    * Drill Iceberg Metastore components configuration properties namespace.
    */
   String COMPONENTS = BASE + "components.";
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
index f06d496..25cd340 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
@@ -17,33 +17,15 @@
  */
 package org.apache.drill.metastore.iceberg.operate;
 
-import com.typesafe.config.ConfigValue;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
-import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.util.PropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Iceberg table generates metadata for each modification operation:
@@ -51,226 +33,55 @@ import java.util.regex.Pattern;
  * previously stored data files are not deleted. These files with the time
  * can occupy lots of space.
  * <p/>
- * Expiration handler expires outdated metadata and data files after configured expiration period.
- * Expiration period is set in the Iceberg Metastore config {@link IcebergConfigConstants#EXPIRATION_PERIOD}.
- * Units should correspond to {@link ChronoUnit} values that do not have estimated duration
- * (millis, seconds, minutes, hours, days).
- * If expiration period is not set, zero or negative, expiration process will not be executed.
- * <p/>
- * Expiration process is launched using executor service which allows to execute only one thread at a time,
- * idle thread is not kept in the core pool since it is assumed that expiration process won't be launched to often.
- * <p/>
- * During Drillbit shutdown, if there are expiration tasks in the queue, they will be discarded in order to
- * unblock Drillbit shutdown process.
+ * Table metadata in Iceberg is expired automatically
+ * if {@link TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} set to true.
+ * Number of metadata files to be retained is configured using {@link TableProperties#METADATA_PREVIOUS_VERSIONS_MAX}.
+ * Snapshots and data expiration should be called manually to align with metadata expiration process,
+ * the same table properties are used to determine if expiration is needed and which number
+ * of snapshots should be retained.
  */
-public class ExpirationHandler implements AutoCloseable {
+public class ExpirationHandler {
 
   private static final Logger logger = LoggerFactory.getLogger(ExpirationHandler.class);
 
-  private static final Pattern METADATA_VERSION_PATTERN = Pattern.compile("^v([0-9]+)\\..*");
-
-  // contains Iceberg table location and its last expiration time
-  private final Map<String, Long> expirationStatus = new ConcurrentHashMap<>();
-  private final Configuration configuration;
-  private final long expirationPeriod;
-  private volatile ExecutorService executorService;
-
-  public ExpirationHandler(DrillConfig config, Configuration configuration) {
-    this.configuration = configuration;
-    this.expirationPeriod = expirationPeriod(config);
-    logger.debug("Drill Iceberg Metastore expiration period: {}", expirationPeriod);
-  }
-
-  /**
-   * Checks if expiration process needs to be performed for the given Iceberg table
-   * by comparing stored last expiration time.
-   * If difference between last expiration time and current time is more or equal to
-   * expiration period, launches expiration process.
-   * If expiration period is zero or negative, no expiration process will be launched.
-   *
-   * @param table Iceberg table instance
-   * @return true if expiration process was launched, false otherwise
-   */
-  public boolean expire(Table table) {
-    if (expirationPeriod <= 0) {
-      return false;
-    }
-
-    long current = System.currentTimeMillis();
-    Long last = expirationStatus.putIfAbsent(table.location(), current);
-
-    if (last != null && current - last >= expirationPeriod) {
-      expirationStatus.put(table.location(), current);
+  private final Table table;
+  private final boolean shouldExpire;
+  private final int retainNumber;
 
-      ExecutorService executorService = executorService();
-      executorService.submit(() -> {
-        logger.debug("Expiring Iceberg table [{}] metadata", table.location());
-        table.expireSnapshots()
-          .expireOlderThan(current)
-          .commit();
-        // TODO: Replace with table metadata expiration through Iceberg API
-        //       when https://github.com/apache/incubator-iceberg/issues/181 is resolved
-        //       table.expireTableMetadata().expireOlderThan(current).commit();
-        expireTableMetadata(table);
-      });
-      return true;
-    }
-    return false;
-  }
-
-  public long expirationPeriod() {
-    return expirationPeriod;
-  }
+  public ExpirationHandler(Table table) {
+    this.table = table;
 
-  @Override
-  public void close() {
-    if (executorService != null) {
-      // unlike shutdown(), shutdownNow() discards all queued waiting tasks
-      // this is done in order to unblock Drillbit shutdown
-      executorService.shutdownNow();
-    }
-  }
-
-  private long expirationPeriod(DrillConfig config) {
-    if (config.hasPath(IcebergConfigConstants.EXPIRATION_PERIOD)) {
-      Duration duration = config.getConfig(IcebergConfigConstants.EXPIRATION_PERIOD).entrySet().stream()
-        .map(this::duration)
-        .reduce(Duration.ZERO, Duration::plus);
-      return duration.toMillis();
-    }
-    return 0;
-  }
-
-  private Duration duration(Map.Entry<String, ConfigValue> entry) {
-    String amountText = String.valueOf(entry.getValue().unwrapped());
-    String unitText = entry.getKey().toUpperCase();
-    try {
-      long amount = Long.parseLong(amountText);
-      ChronoUnit unit = ChronoUnit.valueOf(unitText);
-      return Duration.of(amount, unit);
-    } catch (NumberFormatException e) {
-      throw new IcebergMetastoreException(String.format("Error when parsing expiration period config. " +
-        "Unable to convert [%s] into long", amountText), e);
-    } catch (IllegalArgumentException e) {
-      throw new IcebergMetastoreException(String.format("Error when parsing expiration period config. " +
-        "Unable to convert [%s] into [%s]", unitText, ChronoUnit.class.getCanonicalName()), e);
-    }
+    Map<String, String> properties = table.properties();
+    this.shouldExpire = PropertyUtil.propertyAsBoolean(properties,
+      TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
+      TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);
+    this.retainNumber = PropertyUtil.propertyAsInt(properties,
+      TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
+      TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT);
   }
 
   /**
-   * Initializes executor service instance using DCL.
-   * Created thread executor instance allows to execute only one thread at a time
-   * but unlike single thread executor does not keep this thread in the pool.
-   * Custom thread factory is used to define Iceberg Metastore specific thread names.
-   *
-   * @return executor service instance
+   * Expires snapshots and related data if needed
+   * based on the given table properties values.
    */
-  private ExecutorService executorService() {
-    if (executorService == null) {
-      synchronized (this) {
-        if (executorService == null) {
-          this.executorService = new ThreadPoolExecutor(0, 1, 0L,
-            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new IcebergThreadFactory());
-        }
-      }
+  public void expire() {
+    if (shouldExpire) {
+      table.expireSnapshots()
+        .expireOlderThan(System.currentTimeMillis())
+        .retainLast(retainNumber)
+        .commit();
     }
-    return executorService;
   }
 
   /**
-   * Expires outdated Iceberg table metadata files.
-   * Reads current Iceberg table metadata version from version-hint.text file
-   * and deletes all metadata files that end with ".metadata.json" and have
-   * version less than current.
-   * <p/>
-   * Should be replaced with
-   * <code>table.expireTableMetadata().expireOlderThan(current).commit();</code>
-   * when <a href="https://github.com/apache/incubator-iceberg/issues/181">Issue#181</a>
-   * is resolved.
-   *
-   * @param table Iceberg table instance
+   * Expires snapshots and related data and ignores possible exceptions.
    */
-  private void expireTableMetadata(Table table) {
+  public void expireQuietly() {
     try {
-      String location = table.location();
-      Path metadata = new Path(location, "metadata");
-      FileSystem fs = metadata.getFileSystem(configuration);
-      for (FileStatus fileStatus : listExpiredMetadataFiles(fs, metadata)) {
-        if (fs.delete(fileStatus.getPath(), false)) {
-          logger.debug("Deleted Iceberg table [{}] metadata file [{}]", table.location(), fileStatus.getPath());
-        }
-      }
-    } catch (NumberFormatException | IOException e) {
-      logger.warn("Unable to expire Iceberg table [{}] metadata files", table.location(), e);
-    }
-  }
-
-  /**
-   * Reads current Iceberg table metadata version from version-hint.text file
-   * and returns all metadata files that end with ".metadata.json" and have
-   * version less than current.
-   *
-   * @param fs file system
-   * @param metadata pth to Iceberg metadata
-   * @return metadata files with version less than current
-   * @throws IOException in case of error listing file statuses
-   */
-  private FileStatus[] listExpiredMetadataFiles(FileSystem fs, Path metadata) throws IOException {
-    int currentVersion = currentVersion(fs, metadata);
-    return fs.listStatus(metadata, path -> {
-      if (path.getName().endsWith(".metadata.json")) {
-        int version = parseVersion(path);
-        return version != -1 && currentVersion > version;
-      }
-      return false;
-    });
-  }
-
-  /**
-   * Reads current table metadata version from version-hint.text file.
-   *
-   * @param fs file system
-   * @param metadata table metadata path
-   * @return current table metadata version
-   * @throws IOException if unable to read current table metadata version
-   */
-  private int currentVersion(FileSystem fs, Path metadata) throws IOException {
-    Path versionHintFile = new Path(metadata, "version-hint.text");
-    try (BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(versionHintFile)))) {
-      return Integer.parseInt(in.readLine().replace("\n", ""));
-    }
-  }
-
-  /**
-   * Extracts metadata version from table metadata file name.
-   * Example: v1.metadata.json -> 1, v15.metadata.json -> 15
-   *
-   * @param path table metadata file path
-   * @return metadata version
-   */
-  private int parseVersion(Path path) {
-    Matcher matcher = METADATA_VERSION_PATTERN.matcher(path.getName());
-    if (matcher.find() && matcher.groupCount() == 1) {
-      return Integer.parseInt(matcher.group(1));
-    }
-    throw new NumberFormatException("Unable to parse version for path " + path);
-  }
-
-  /**
-   * Wraps default thread factory and adds Iceberg Metastore prefix to the original thread name.
-   * Is used to uniquely identify Iceberg metastore threads.
-   * Example: drill-iceberg-metastore-pool-1-thread-1
-   */
-  private static class IcebergThreadFactory implements ThreadFactory {
-
-    private static final String THREAD_PREFIX = "drill-iceberg-metastore-";
-    private final ThreadFactory delegate = Executors.defaultThreadFactory();
-
-    @Override
-    public Thread newThread(Runnable runnable) {
-      Thread thread = delegate.newThread(runnable);
-      thread.setName(THREAD_PREFIX + thread.getName());
-      return thread;
+      expire();
+    } catch (ValidationException | CommitFailedException e) {
+      logger.warn("Unable to expire snapshots: {}", e.getMessage());
+      logger.debug("Error when expiring snapshots", e);
     }
   }
 }
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
index a510f86..e29571f 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
@@ -80,7 +80,8 @@ public class IcebergModify<T> implements Modify<T> {
     operations.forEach(op -> op.add(transaction));
     transaction.commitTransaction();
 
-    // check if Iceberg table metadata needs to be expired after each modification operation
-    context.expirationHandler().expire(context.table());
+    // expiration process should not intervene with data modification operations
+    // if expiration fails, will attempt to expire the next time
+    context.expirationHandler().expireQuietly();
   }
 }
diff --git a/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf b/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf
index 33fe795..de1f1f9 100644
--- a/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf
+++ b/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf
@@ -34,18 +34,14 @@ drill.metastore.iceberg: {
     relative_path: ${drill.exec.zk.root}"/metastore/iceberg"
   }
 
-  // Specifies time period after which outdated Iceberg table metadata will be expired,
-  // unit names must correspond to java.time.temporal.ChronoUnit enum values
-  // that do not have estimated duration (millis, seconds, minutes, hours, days).
-  // Example: hours: 10, minutes: 20
-  expiration.period: {
-    days: 5
-  }
-
   components: {
         // Common properties for all Iceberg tables from org.apache.iceberg.TableProperties can be specified
        common.properties: {
-          write.metadata.compression-codec: "none"
+         write.metadata.compression-codec: "none",
+         // Enables metadata expiration to avoid consuming space with historical data
+         // In Drill the same table properties apply to the snapshots expiration process as well
+         write.metadata.delete-after-commit.enabled: true
+         write.metadata.previous-versions-max: 2,
        },
 
     tables: {
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java
index 90ef55b..d330b3b 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java
@@ -17,127 +17,98 @@
  */
 package org.apache.drill.metastore.iceberg.operate;
 
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.components.tables.Tables;
 import org.apache.drill.metastore.iceberg.IcebergBaseTest;
+import org.apache.drill.metastore.iceberg.IcebergMetastore;
+import org.apache.drill.metastore.iceberg.components.tables.IcebergTables;
 import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
-import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
-import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
+import java.io.File;
+import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class TestExpirationHandler extends IcebergBaseTest {
 
   @Test
-  public void testConfigEmpty() {
-    ExpirationHandler expirationHandler = new ExpirationHandler(DrillConfig.create(), baseHadoopConfig());
-    assertEquals(0, expirationHandler.expirationPeriod());
-  }
+  public void testNoExpiration() {
+    IcebergTables tables = tables("no-expiration", false, 2);
 
-  @Test
-  public void testConfigOneUnit() {
-    DrillConfig config = new DrillConfig(DrillConfig.create()
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
-        ConfigValueFactory.fromAnyRef(5)));
-    ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
-    assertEquals(TimeUnit.HOURS.toMillis(5), expirationHandler.expirationPeriod());
-  }
+    // check that there is no history
+    assertEquals(0, tables.table().history().size());
 
-  @Test
-  public void testConfigSeveralUnits() {
-    DrillConfig config = new DrillConfig(DrillConfig.create()
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
-        ConfigValueFactory.fromAnyRef(5))
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".minutes",
-        ConfigValueFactory.fromAnyRef(10)));
-    ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
-    assertEquals(TimeUnit.HOURS.toMillis(5) + TimeUnit.MINUTES.toMillis(10),
-      expirationHandler.expirationPeriod());
-  }
+    int operationsNumber = 5;
+    execute(tables, operationsNumber);
 
-  @Test
-  public void testConfigNegativeValue() {
-    DrillConfig config = new DrillConfig(DrillConfig.create()
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
-        ConfigValueFactory.fromAnyRef(-5)));
-    ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
-    assertEquals(TimeUnit.HOURS.toMillis(-5), expirationHandler.expirationPeriod());
+    // check that the number of executed operations is same as number of history records
+    assertEquals(operationsNumber, tables.table().history().size());
   }
 
   @Test
-  public void testConfigIncorrectUnit() {
-    DrillConfig config = new DrillConfig(DrillConfig.create()
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hour",
-        ConfigValueFactory.fromAnyRef(5)));
+  public void testExpiration() {
+    int retainNumber = 3;
+    IcebergTables tables = tables("expiration", true, retainNumber);
 
-    thrown.expect(IcebergMetastoreException.class);
-    new ExpirationHandler(config, baseHadoopConfig());
-  }
+    // check that there is no history
+    assertEquals(0, tables.table().history().size());
 
-  @Test
-  public void testConfigIncorrectValue() {
-    DrillConfig config = new DrillConfig(DrillConfig.create()
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
-        ConfigValueFactory.fromAnyRef("abc")));
+    execute(tables, 5);
 
-    thrown.expect(IcebergMetastoreException.class);
-    new ExpirationHandler(config, baseHadoopConfig());
+    // check that number of history records corresponds to the expected retain number
+    assertEquals(retainNumber, tables.table().history().size());
   }
 
   @Test
-  public void testExpireZeroExpirationPeriod() {
-    DrillConfig config = new DrillConfig(DrillConfig.create()
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis",
-        ConfigValueFactory.fromAnyRef(0)));
-
-    ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
-    Table table = mock(Table.class);
-    assertFalse(expirationHandler.expire(table));
-  }
+  public void testSubsequentExpiration() {
+    String name = "subsequent-expiration";
+    int retainNumber = 2;
+    int operationsNumber = 5;
 
-  @Test
-  public void testExpireNegativeExpirationPeriod() {
-    DrillConfig config = new DrillConfig(DrillConfig.create()
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis",
-        ConfigValueFactory.fromAnyRef(-10)));
-
-    ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
-    Table table = mock(Table.class);
-    assertFalse(expirationHandler.expire(table));
-  }
+    IcebergTables initialTables = tables(name, false, retainNumber);
 
-  @Test
-  public void testExpireFirstTime() {
-    DrillConfig config = new DrillConfig(DrillConfig.create()
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis",
-        ConfigValueFactory.fromAnyRef(1)));
+    execute(initialTables, operationsNumber);
 
-    ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
+    // check that number of executed operations is the same as number of history records
+    assertEquals(operationsNumber, initialTables.table().history().size());
 
-    Table table = mock(Table.class);
-    when(table.location()).thenReturn("/tmp/table");
+    // update table configuration, allow expiration
+    IcebergTables updatedTables = tables(name, true, retainNumber);
 
-    assertFalse(expirationHandler.expire(table));
-  }
+    // check that number of history operation did not change
+    assertEquals(operationsNumber, updatedTables.table().history().size());
 
-  @Test
-  public void testExpireBefore() {
-    DrillConfig config = new DrillConfig(DrillConfig.create()
-      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".days",
-        ConfigValueFactory.fromAnyRef(1)));
+    execute(updatedTables, operationsNumber);
 
-    ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
+    // check that number of history records corresponds to the expected retain number
+    assertEquals(retainNumber, updatedTables.table().history().size());
+  }
 
-    Table table = mock(Table.class);
-    when(table.location()).thenReturn("/tmp/table");
+  private IcebergTables tables(String name, boolean shouldExpire, int retainNumber) {
+    Config config = baseIcebergConfig(new File(defaultFolder.getRoot(), name))
+      .withValue(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES + "." + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
+        ConfigValueFactory.fromAnyRef(retainNumber))
+      .withValue(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES + "." + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
+        ConfigValueFactory.fromAnyRef(shouldExpire));
+    DrillConfig drillConfig = new DrillConfig(config);
+    return (IcebergTables) new IcebergMetastore(drillConfig).tables();
+  }
 
-    assertFalse(expirationHandler.expire(table));
-    assertFalse(expirationHandler.expire(table));
+  private void execute(Tables tables, int operationsNumber) {
+    IntStream.range(0, operationsNumber)
+      .mapToObj(i -> TableMetadataUnit.builder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("dir" + i)
+        .build())
+      .forEach(table -> tables.modify()
+        .overwrite(table)
+        .execute());
   }
 }