You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/03 00:26:04 UTC
[gobblin] branch master updated: set RETENTION_DATASET_ROOT in
CleanableIcebergDataset so that any retention job can use this information
(#3422)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 80bcfb7 set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention job can use this information (#3422)
80bcfb7 is described below
commit 80bcfb752a2c8dc15465f70a2ea142482147d3da
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Tue Nov 2 17:25:55 2021 -0700
set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention job can use this information (#3422)
---
.../ManagedIcebergCleanableDatasetFinder.java | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
index a2684e4..f45b798 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
@@ -17,10 +17,16 @@
package org.apache.gobblin.data.management.retention.profile;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientCache;
import org.apache.gobblin.config.client.api.ConfigStoreFactoryDoesNotExistsException;
@@ -28,12 +34,10 @@ import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
import org.apache.gobblin.config.store.api.VersionDoesNotExistException;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.retention.dataset.CleanableIcebergDataset;
import org.apache.gobblin.data.management.retention.dataset.ConfigurableCleanableDataset;
+import org.apache.gobblin.data.management.retention.dataset.FsCleanableHelper;
import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
-import org.apache.gobblin.data.management.retention.dataset.CleanableIcebergDataset;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.LoggerFactory;
public class ManagedIcebergCleanableDatasetFinder extends ManagedCleanableDatasetFinder {
@@ -51,9 +55,13 @@ public class ManagedIcebergCleanableDatasetFinder extends ManagedCleanableDatase
@Override
public ConfigurableCleanableDataset<FileSystemDatasetVersion> datasetAtPath(Path path) throws IOException {
+ Properties datasetProps = new Properties();
+ datasetProps.putAll(this.props);
+ datasetProps.setProperty(FsCleanableHelper.RETENTION_DATASET_ROOT, path.toString());
+
try {
- return new CleanableIcebergDataset<>(this.fs, this.props, path,
- this.client.getConfig(this.props.getProperty(ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI) + ICEBERG_CONFIG_PREFIX + path.toString()),
+ return new CleanableIcebergDataset<>(this.fs, datasetProps, path,
+ this.client.getConfig(this.props.getProperty(ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI) + ICEBERG_CONFIG_PREFIX + path),
LoggerFactory.getLogger(CleanableIcebergDataset.class));
} catch (ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException | URISyntaxException | VersionDoesNotExistException var3) {
throw new IllegalArgumentException(var3);