You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2020/02/04 11:53:03 UTC
[drill] branch master updated: DRILL-7544: Upgrade Iceberg version
to support Parquet 1.11.0
This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 9f7c58f DRILL-7544: Upgrade Iceberg version to support Parquet 1.11.0
9f7c58f is described below
commit 9f7c58fc5a47cfe67e5fb5f5f35971cf534133d5
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Wed Jan 22 15:55:09 2020 +0200
DRILL-7544: Upgrade Iceberg version to support Parquet 1.11.0
1. Upgraded Iceberg to the commit that supports Parquet 1.11.0.
2. Removed workaround in ExpirationHandler and used built-in logic from Iceberg library.
3. Updated description about expiration logic in README.md.
---
metastore/iceberg-metastore/README.md | 10 +-
metastore/iceberg-metastore/pom.xml | 12 +-
.../drill/metastore/iceberg/IcebergMetastore.java | 6 +-
.../iceberg/components/tables/IcebergTables.java | 4 +-
.../iceberg/config/IcebergConfigConstants.java | 5 -
.../iceberg/operate/ExpirationHandler.java | 263 +++------------------
.../metastore/iceberg/operate/IcebergModify.java | 5 +-
.../src/main/resources/drill-metastore-module.conf | 14 +-
.../iceberg/operate/TestExpirationHandler.java | 149 +++++-------
9 files changed, 119 insertions(+), 349 deletions(-)
diff --git a/metastore/iceberg-metastore/README.md b/metastore/iceberg-metastore/README.md
index 3db08d1..094154b 100644
--- a/metastore/iceberg-metastore/README.md
+++ b/metastore/iceberg-metastore/README.md
@@ -183,8 +183,8 @@ expiration process is launched.
Iceberg table generates metadata for each modification operation:
snapshot, manifest file, table metadata file. Also when performing delete operation,
previously stored data files are not deleted. These files with the time
-can occupy lots of space. `ExpirationHandler` allows to expire outdated metadata and
-data files after configured time period (`drill.metastore.iceberg.expiration.period`).
-If expiration period is not indicated, zero or negative, expiration won't be performed.
-`ExpirationHandler` is called after each modification operation, it checks if expiration period
-has elapsed and submits expiration process in a separate thread.
+can occupy lots of space. Two table properties `write.metadata.delete-after-commit.enabled`
+and `write.metadata.previous-versions-max` control expiration process.
+Metadata files will be expired automatically if `write.metadata.delete-after-commit.enabled`
+is enabled. Snapshots and data files will be expired using `ExpirationHandler`
+after each commit operation based on the same table properties values.
diff --git a/metastore/iceberg-metastore/pom.xml b/metastore/iceberg-metastore/pom.xml
index 818ea08..aa68271 100644
--- a/metastore/iceberg-metastore/pom.xml
+++ b/metastore/iceberg-metastore/pom.xml
@@ -31,7 +31,7 @@
<name>metastore/Drill Iceberg Metastore</name>
<properties>
- <iceberg.version>0.7.0-incubating</iceberg.version>
+ <iceberg.version>2d75130</iceberg.version>
<caffeine.version>2.7.0</caffeine.version>
</properties>
@@ -47,27 +47,27 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.iceberg</groupId>
+ <groupId>com.github.apache.incubator-iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.iceberg</groupId>
+ <groupId>com.github.apache.incubator-iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.iceberg</groupId>
+ <groupId>com.github.apache.incubator-iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.iceberg</groupId>
+ <groupId>com.github.apache.incubator-iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.iceberg</groupId>
+ <groupId>com.github.apache.incubator-iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
index e223c5a..94bc6bf 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
@@ -25,7 +25,6 @@ import org.apache.drill.metastore.components.views.Views;
import org.apache.drill.metastore.iceberg.components.tables.IcebergTables;
import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
-import org.apache.drill.metastore.iceberg.operate.ExpirationHandler;
import org.apache.drill.metastore.iceberg.schema.IcebergTableSchema;
import org.apache.drill.shaded.guava.com.google.common.collect.MapDifference;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -59,7 +58,6 @@ public class IcebergMetastore implements Metastore {
private final org.apache.iceberg.Tables tables;
private final String baseLocation;
private final Map<String, String> commonProperties;
- private final ExpirationHandler expirationHandler;
/**
* Table properties for each Iceberg table should be updated only once,
@@ -77,7 +75,6 @@ public class IcebergMetastore implements Metastore {
this.tables = new HadoopTables(new Configuration(configuration));
this.baseLocation = baseLocation(new Configuration(configuration));
this.commonProperties = properties(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES);
- this.expirationHandler = new ExpirationHandler(config, new Configuration(configuration));
}
@Override
@@ -85,7 +82,7 @@ public class IcebergMetastore implements Metastore {
Table table = loadTable(IcebergConfigConstants.COMPONENTS_TABLES_LOCATION,
IcebergConfigConstants.COMPONENTS_TABLES_PROPERTIES,
IcebergTables.SCHEMA, Tables.class);
- return new IcebergTables(table, expirationHandler);
+ return new IcebergTables(table);
}
@Override
@@ -273,6 +270,5 @@ public class IcebergMetastore implements Metastore {
@Override
public void close() {
- expirationHandler.close();
}
}
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
index f4d32e9..a28248d 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
@@ -58,9 +58,9 @@ public class IcebergTables implements Tables, MetastoreContext<TableMetadataUnit
private final Table table;
private final ExpirationHandler expirationHandler;
- public IcebergTables(Table table, ExpirationHandler expirationHandler) {
+ public IcebergTables(Table table) {
this.table = table;
- this.expirationHandler = expirationHandler;
+ this.expirationHandler = new ExpirationHandler(table);
}
public MetastoreContext<TableMetadataUnit> context() {
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
index 4419f68..6f326ac 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
@@ -52,11 +52,6 @@ public interface IcebergConfigConstants {
String RELATIVE_PATH = LOCATION_NAMESPACE + "relative_path";
/**
- * Defines config which provides expiration period value.
- */
- String EXPIRATION_PERIOD = BASE + "expiration.period";
-
- /**
* Drill Iceberg Metastore components configuration properties namespace.
*/
String COMPONENTS = BASE + "components.";
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
index f06d496..25cd340 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
@@ -17,33 +17,15 @@
*/
package org.apache.drill.metastore.iceberg.operate;
-import com.typesafe.config.ConfigValue;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
-import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* Iceberg table generates metadata for each modification operation:
@@ -51,226 +33,55 @@ import java.util.regex.Pattern;
* previously stored data files are not deleted. These files with the time
* can occupy lots of space.
* <p/>
- * Expiration handler expires outdated metadata and data files after configured expiration period.
- * Expiration period is set in the Iceberg Metastore config {@link IcebergConfigConstants#EXPIRATION_PERIOD}.
- * Units should correspond to {@link ChronoUnit} values that do not have estimated duration
- * (millis, seconds, minutes, hours, days).
- * If expiration period is not set, zero or negative, expiration process will not be executed.
- * <p/>
- * Expiration process is launched using executor service which allows to execute only one thread at a time,
- * idle thread is not kept in the core pool since it is assumed that expiration process won't be launched to often.
- * <p/>
- * During Drillbit shutdown, if there are expiration tasks in the queue, they will be discarded in order to
- * unblock Drillbit shutdown process.
+ * Table metadata in Iceberg is expired automatically
+ * if {@link TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} set to true.
+ * Number of metadata files to be retained is configured using {@link TableProperties#METADATA_PREVIOUS_VERSIONS_MAX}.
+ * Snapshots and data expiration should be called manually to align with metadata expiration process,
+ * the same table properties are used to determine if expiration is needed and which number
+ * of snapshots should be retained.
*/
-public class ExpirationHandler implements AutoCloseable {
+public class ExpirationHandler {
private static final Logger logger = LoggerFactory.getLogger(ExpirationHandler.class);
- private static final Pattern METADATA_VERSION_PATTERN = Pattern.compile("^v([0-9]+)\\..*");
-
- // contains Iceberg table location and its last expiration time
- private final Map<String, Long> expirationStatus = new ConcurrentHashMap<>();
- private final Configuration configuration;
- private final long expirationPeriod;
- private volatile ExecutorService executorService;
-
- public ExpirationHandler(DrillConfig config, Configuration configuration) {
- this.configuration = configuration;
- this.expirationPeriod = expirationPeriod(config);
- logger.debug("Drill Iceberg Metastore expiration period: {}", expirationPeriod);
- }
-
- /**
- * Checks if expiration process needs to be performed for the given Iceberg table
- * by comparing stored last expiration time.
- * If difference between last expiration time and current time is more or equal to
- * expiration period, launches expiration process.
- * If expiration period is zero or negative, no expiration process will be launched.
- *
- * @param table Iceberg table instance
- * @return true if expiration process was launched, false otherwise
- */
- public boolean expire(Table table) {
- if (expirationPeriod <= 0) {
- return false;
- }
-
- long current = System.currentTimeMillis();
- Long last = expirationStatus.putIfAbsent(table.location(), current);
-
- if (last != null && current - last >= expirationPeriod) {
- expirationStatus.put(table.location(), current);
+ private final Table table;
+ private final boolean shouldExpire;
+ private final int retainNumber;
- ExecutorService executorService = executorService();
- executorService.submit(() -> {
- logger.debug("Expiring Iceberg table [{}] metadata", table.location());
- table.expireSnapshots()
- .expireOlderThan(current)
- .commit();
- // TODO: Replace with table metadata expiration through Iceberg API
- // when https://github.com/apache/incubator-iceberg/issues/181 is resolved
- // table.expireTableMetadata().expireOlderThan(current).commit();
- expireTableMetadata(table);
- });
- return true;
- }
- return false;
- }
-
- public long expirationPeriod() {
- return expirationPeriod;
- }
+ public ExpirationHandler(Table table) {
+ this.table = table;
- @Override
- public void close() {
- if (executorService != null) {
- // unlike shutdown(), shutdownNow() discards all queued waiting tasks
- // this is done in order to unblock Drillbit shutdown
- executorService.shutdownNow();
- }
- }
-
- private long expirationPeriod(DrillConfig config) {
- if (config.hasPath(IcebergConfigConstants.EXPIRATION_PERIOD)) {
- Duration duration = config.getConfig(IcebergConfigConstants.EXPIRATION_PERIOD).entrySet().stream()
- .map(this::duration)
- .reduce(Duration.ZERO, Duration::plus);
- return duration.toMillis();
- }
- return 0;
- }
-
- private Duration duration(Map.Entry<String, ConfigValue> entry) {
- String amountText = String.valueOf(entry.getValue().unwrapped());
- String unitText = entry.getKey().toUpperCase();
- try {
- long amount = Long.parseLong(amountText);
- ChronoUnit unit = ChronoUnit.valueOf(unitText);
- return Duration.of(amount, unit);
- } catch (NumberFormatException e) {
- throw new IcebergMetastoreException(String.format("Error when parsing expiration period config. " +
- "Unable to convert [%s] into long", amountText), e);
- } catch (IllegalArgumentException e) {
- throw new IcebergMetastoreException(String.format("Error when parsing expiration period config. " +
- "Unable to convert [%s] into [%s]", unitText, ChronoUnit.class.getCanonicalName()), e);
- }
+ Map<String, String> properties = table.properties();
+ this.shouldExpire = PropertyUtil.propertyAsBoolean(properties,
+ TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
+ TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);
+ this.retainNumber = PropertyUtil.propertyAsInt(properties,
+ TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
+ TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT);
}
/**
- * Initializes executor service instance using DCL.
- * Created thread executor instance allows to execute only one thread at a time
- * but unlike single thread executor does not keep this thread in the pool.
- * Custom thread factory is used to define Iceberg Metastore specific thread names.
- *
- * @return executor service instance
+ * Expires snapshots and related data if needed
+ * based on the given table properties values.
*/
- private ExecutorService executorService() {
- if (executorService == null) {
- synchronized (this) {
- if (executorService == null) {
- this.executorService = new ThreadPoolExecutor(0, 1, 0L,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new IcebergThreadFactory());
- }
- }
+ public void expire() {
+ if (shouldExpire) {
+ table.expireSnapshots()
+ .expireOlderThan(System.currentTimeMillis())
+ .retainLast(retainNumber)
+ .commit();
}
- return executorService;
}
/**
- * Expires outdated Iceberg table metadata files.
- * Reads current Iceberg table metadata version from version-hint.text file
- * and deletes all metadata files that end with ".metadata.json" and have
- * version less than current.
- * <p/>
- * Should be replaced with
- * <code>table.expireTableMetadata().expireOlderThan(current).commit();</code>
- * when <a href="https://github.com/apache/incubator-iceberg/issues/181">Issue#181</a>
- * is resolved.
- *
- * @param table Iceberg table instance
+ * Expires snapshots and related data and ignores possible exceptions.
*/
- private void expireTableMetadata(Table table) {
+ public void expireQuietly() {
try {
- String location = table.location();
- Path metadata = new Path(location, "metadata");
- FileSystem fs = metadata.getFileSystem(configuration);
- for (FileStatus fileStatus : listExpiredMetadataFiles(fs, metadata)) {
- if (fs.delete(fileStatus.getPath(), false)) {
- logger.debug("Deleted Iceberg table [{}] metadata file [{}]", table.location(), fileStatus.getPath());
- }
- }
- } catch (NumberFormatException | IOException e) {
- logger.warn("Unable to expire Iceberg table [{}] metadata files", table.location(), e);
- }
- }
-
- /**
- * Reads current Iceberg table metadata version from version-hint.text file
- * and returns all metadata files that end with ".metadata.json" and have
- * version less than current.
- *
- * @param fs file system
- * @param metadata pth to Iceberg metadata
- * @return metadata files with version less than current
- * @throws IOException in case of error listing file statuses
- */
- private FileStatus[] listExpiredMetadataFiles(FileSystem fs, Path metadata) throws IOException {
- int currentVersion = currentVersion(fs, metadata);
- return fs.listStatus(metadata, path -> {
- if (path.getName().endsWith(".metadata.json")) {
- int version = parseVersion(path);
- return version != -1 && currentVersion > version;
- }
- return false;
- });
- }
-
- /**
- * Reads current table metadata version from version-hint.text file.
- *
- * @param fs file system
- * @param metadata table metadata path
- * @return current table metadata version
- * @throws IOException if unable to read current table metadata version
- */
- private int currentVersion(FileSystem fs, Path metadata) throws IOException {
- Path versionHintFile = new Path(metadata, "version-hint.text");
- try (BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(versionHintFile)))) {
- return Integer.parseInt(in.readLine().replace("\n", ""));
- }
- }
-
- /**
- * Extracts metadata version from table metadata file name.
- * Example: v1.metadata.json -> 1, v15.metadata.json -> 15
- *
- * @param path table metadata file path
- * @return metadata version
- */
- private int parseVersion(Path path) {
- Matcher matcher = METADATA_VERSION_PATTERN.matcher(path.getName());
- if (matcher.find() && matcher.groupCount() == 1) {
- return Integer.parseInt(matcher.group(1));
- }
- throw new NumberFormatException("Unable to parse version for path " + path);
- }
-
- /**
- * Wraps default thread factory and adds Iceberg Metastore prefix to the original thread name.
- * Is used to uniquely identify Iceberg metastore threads.
- * Example: drill-iceberg-metastore-pool-1-thread-1
- */
- private static class IcebergThreadFactory implements ThreadFactory {
-
- private static final String THREAD_PREFIX = "drill-iceberg-metastore-";
- private final ThreadFactory delegate = Executors.defaultThreadFactory();
-
- @Override
- public Thread newThread(Runnable runnable) {
- Thread thread = delegate.newThread(runnable);
- thread.setName(THREAD_PREFIX + thread.getName());
- return thread;
+ expire();
+ } catch (ValidationException | CommitFailedException e) {
+ logger.warn("Unable to expire snapshots: {}", e.getMessage());
+ logger.debug("Error when expiring snapshots", e);
}
}
}
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
index a510f86..e29571f 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
@@ -80,7 +80,8 @@ public class IcebergModify<T> implements Modify<T> {
operations.forEach(op -> op.add(transaction));
transaction.commitTransaction();
- // check if Iceberg table metadata needs to be expired after each modification operation
- context.expirationHandler().expire(context.table());
+ // expiration process should not intervene with data modification operations
+ // if expiration fails, will attempt to expire the next time
+ context.expirationHandler().expireQuietly();
}
}
diff --git a/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf b/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf
index 33fe795..de1f1f9 100644
--- a/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf
+++ b/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf
@@ -34,18 +34,14 @@ drill.metastore.iceberg: {
relative_path: ${drill.exec.zk.root}"/metastore/iceberg"
}
- // Specifies time period after which outdated Iceberg table metadata will be expired,
- // unit names must correspond to java.time.temporal.ChronoUnit enum values
- // that do not have estimated duration (millis, seconds, minutes, hours, days).
- // Example: hours: 10, minutes: 20
- expiration.period: {
- days: 5
- }
-
components: {
// Common properties for all Iceberg tables from org.apache.iceberg.TableProperties can be specified
common.properties: {
- write.metadata.compression-codec: "none"
+ write.metadata.compression-codec: "none",
+ // Enables metadata expiration to avoid consuming space with historical data
+ // In Drill the same table properties apply to the snapshots expiration process as well
+ write.metadata.delete-after-commit.enabled: true
+ write.metadata.previous-versions-max: 2,
},
tables: {
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java
index 90ef55b..d330b3b 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java
@@ -17,127 +17,98 @@
*/
package org.apache.drill.metastore.iceberg.operate;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.components.tables.Tables;
import org.apache.drill.metastore.iceberg.IcebergBaseTest;
+import org.apache.drill.metastore.iceberg.IcebergMetastore;
+import org.apache.drill.metastore.iceberg.components.tables.IcebergTables;
import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
-import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
-import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.junit.Test;
-import java.util.concurrent.TimeUnit;
+import java.io.File;
+import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class TestExpirationHandler extends IcebergBaseTest {
@Test
- public void testConfigEmpty() {
- ExpirationHandler expirationHandler = new ExpirationHandler(DrillConfig.create(), baseHadoopConfig());
- assertEquals(0, expirationHandler.expirationPeriod());
- }
+ public void testNoExpiration() {
+ IcebergTables tables = tables("no-expiration", false, 2);
- @Test
- public void testConfigOneUnit() {
- DrillConfig config = new DrillConfig(DrillConfig.create()
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
- ConfigValueFactory.fromAnyRef(5)));
- ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
- assertEquals(TimeUnit.HOURS.toMillis(5), expirationHandler.expirationPeriod());
- }
+ // check that there is no history
+ assertEquals(0, tables.table().history().size());
- @Test
- public void testConfigSeveralUnits() {
- DrillConfig config = new DrillConfig(DrillConfig.create()
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
- ConfigValueFactory.fromAnyRef(5))
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".minutes",
- ConfigValueFactory.fromAnyRef(10)));
- ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
- assertEquals(TimeUnit.HOURS.toMillis(5) + TimeUnit.MINUTES.toMillis(10),
- expirationHandler.expirationPeriod());
- }
+ int operationsNumber = 5;
+ execute(tables, operationsNumber);
- @Test
- public void testConfigNegativeValue() {
- DrillConfig config = new DrillConfig(DrillConfig.create()
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
- ConfigValueFactory.fromAnyRef(-5)));
- ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
- assertEquals(TimeUnit.HOURS.toMillis(-5), expirationHandler.expirationPeriod());
+ // check that the number of executed operations is same as number of history records
+ assertEquals(operationsNumber, tables.table().history().size());
}
@Test
- public void testConfigIncorrectUnit() {
- DrillConfig config = new DrillConfig(DrillConfig.create()
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hour",
- ConfigValueFactory.fromAnyRef(5)));
+ public void testExpiration() {
+ int retainNumber = 3;
+ IcebergTables tables = tables("expiration", true, retainNumber);
- thrown.expect(IcebergMetastoreException.class);
- new ExpirationHandler(config, baseHadoopConfig());
- }
+ // check that there is no history
+ assertEquals(0, tables.table().history().size());
- @Test
- public void testConfigIncorrectValue() {
- DrillConfig config = new DrillConfig(DrillConfig.create()
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
- ConfigValueFactory.fromAnyRef("abc")));
+ execute(tables, 5);
- thrown.expect(IcebergMetastoreException.class);
- new ExpirationHandler(config, baseHadoopConfig());
+ // check that number of history records corresponds to the expected retain number
+ assertEquals(retainNumber, tables.table().history().size());
}
@Test
- public void testExpireZeroExpirationPeriod() {
- DrillConfig config = new DrillConfig(DrillConfig.create()
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis",
- ConfigValueFactory.fromAnyRef(0)));
-
- ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
- Table table = mock(Table.class);
- assertFalse(expirationHandler.expire(table));
- }
+ public void testSubsequentExpiration() {
+ String name = "subsequent-expiration";
+ int retainNumber = 2;
+ int operationsNumber = 5;
- @Test
- public void testExpireNegativeExpirationPeriod() {
- DrillConfig config = new DrillConfig(DrillConfig.create()
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis",
- ConfigValueFactory.fromAnyRef(-10)));
-
- ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
- Table table = mock(Table.class);
- assertFalse(expirationHandler.expire(table));
- }
+ IcebergTables initialTables = tables(name, false, retainNumber);
- @Test
- public void testExpireFirstTime() {
- DrillConfig config = new DrillConfig(DrillConfig.create()
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis",
- ConfigValueFactory.fromAnyRef(1)));
+ execute(initialTables, operationsNumber);
- ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
+ // check that number of executed operations is the same as number of history records
+ assertEquals(operationsNumber, initialTables.table().history().size());
- Table table = mock(Table.class);
- when(table.location()).thenReturn("/tmp/table");
+ // update table configuration, allow expiration
+ IcebergTables updatedTables = tables(name, true, retainNumber);
- assertFalse(expirationHandler.expire(table));
- }
+ // check that number of history operation did not change
+ assertEquals(operationsNumber, updatedTables.table().history().size());
- @Test
- public void testExpireBefore() {
- DrillConfig config = new DrillConfig(DrillConfig.create()
- .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".days",
- ConfigValueFactory.fromAnyRef(1)));
+ execute(updatedTables, operationsNumber);
- ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig());
+ // check that number of history records corresponds to the expected retain number
+ assertEquals(retainNumber, updatedTables.table().history().size());
+ }
- Table table = mock(Table.class);
- when(table.location()).thenReturn("/tmp/table");
+ private IcebergTables tables(String name, boolean shouldExpire, int retainNumber) {
+ Config config = baseIcebergConfig(new File(defaultFolder.getRoot(), name))
+ .withValue(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES + "." + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
+ ConfigValueFactory.fromAnyRef(retainNumber))
+ .withValue(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES + "." + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
+ ConfigValueFactory.fromAnyRef(shouldExpire));
+ DrillConfig drillConfig = new DrillConfig(config);
+ return (IcebergTables) new IcebergMetastore(drillConfig).tables();
+ }
- assertFalse(expirationHandler.expire(table));
- assertFalse(expirationHandler.expire(table));
+ private void execute(Tables tables, int operationsNumber) {
+ IntStream.range(0, operationsNumber)
+ .mapToObj(i -> TableMetadataUnit.builder()
+ .storagePlugin("dfs")
+ .workspace("tmp")
+ .tableName("nation")
+ .metadataKey("dir" + i)
+ .build())
+ .forEach(table -> tables.modify()
+ .overwrite(table)
+ .execute());
}
}