You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/03 00:26:04 UTC

[gobblin] branch master updated: set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention job can use this information (#3422)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 80bcfb7  set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention job can use this information (#3422)
80bcfb7 is described below

commit 80bcfb752a2c8dc15465f70a2ea142482147d3da
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Tue Nov 2 17:25:55 2021 -0700

    set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention job can use this information (#3422)
---
 .../ManagedIcebergCleanableDatasetFinder.java      | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
index a2684e4..f45b798 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
@@ -17,10 +17,16 @@
 
 package org.apache.gobblin.data.management.retention.profile;
 
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.config.client.ConfigClient;
 import org.apache.gobblin.config.client.ConfigClientCache;
 import org.apache.gobblin.config.client.api.ConfigStoreFactoryDoesNotExistsException;
@@ -28,12 +34,10 @@ import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
 import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
 import org.apache.gobblin.config.store.api.VersionDoesNotExistException;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.retention.dataset.CleanableIcebergDataset;
 import org.apache.gobblin.data.management.retention.dataset.ConfigurableCleanableDataset;
+import org.apache.gobblin.data.management.retention.dataset.FsCleanableHelper;
 import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
-import org.apache.gobblin.data.management.retention.dataset.CleanableIcebergDataset;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.LoggerFactory;
 
 
 public class ManagedIcebergCleanableDatasetFinder extends ManagedCleanableDatasetFinder {
@@ -51,9 +55,13 @@ public class ManagedIcebergCleanableDatasetFinder extends ManagedCleanableDatase
 
   @Override
   public ConfigurableCleanableDataset<FileSystemDatasetVersion> datasetAtPath(Path path) throws IOException {
+    Properties datasetProps = new Properties();
+    datasetProps.putAll(this.props);
+    datasetProps.setProperty(FsCleanableHelper.RETENTION_DATASET_ROOT, path.toString());
+
     try {
-      return new CleanableIcebergDataset<>(this.fs, this.props, path,
-          this.client.getConfig(this.props.getProperty(ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI) + ICEBERG_CONFIG_PREFIX + path.toString()),
+      return new CleanableIcebergDataset<>(this.fs, datasetProps, path,
+          this.client.getConfig(this.props.getProperty(ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI) + ICEBERG_CONFIG_PREFIX + path),
           LoggerFactory.getLogger(CleanableIcebergDataset.class));
     } catch (ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException | URISyntaxException | VersionDoesNotExistException var3) {
       throw new IllegalArgumentException(var3);