You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/18 04:52:53 UTC

incubator-gobblin git commit: [GOBBLIN-586] Added enhancement to apply retention on remote HDFS

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 8e974ef09 -> f43de8c4d


[GOBBLIN-586] Added enhancement to apply retention on remote HDFS

Closes #2452 from amarnathkarthik/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f43de8c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f43de8c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f43de8c4

Branch: refs/heads/master
Commit: f43de8c4d7dd0fa3521edc03a173f25873ba9814
Parents: 8e974ef
Author: Karthik Amarnath <ka...@linkedin.com>
Authored: Mon Sep 17 21:52:41 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Sep 17 21:52:41 2018 -0700

----------------------------------------------------------------------
 .../data/management/retention/DatasetCleaner.java     | 14 +++++++++-----
 .../data/management/retention/DatasetCleanerJob.java  |  2 +-
 2 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f43de8c4/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index d0a0a96..6793f5a 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -42,6 +42,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
 import org.apache.gobblin.util.AzkabanTags;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.data.management.retention.dataset.CleanableDataset;
 import org.apache.gobblin.data.management.retention.profile.MultiCleanableDatasetFinder;
@@ -56,6 +57,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.RateControlledFileSystem;
 import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.util.WriterUtils;
 
 
 /**
@@ -86,11 +88,14 @@ public class DatasetCleaner implements Instrumentable, Closeable {
 
   public DatasetCleaner(FileSystem fs, Properties props) throws IOException {
 
+    State state = new State(props);
+    FileSystem targetFs =
+        props.containsKey(ConfigurationKeys.WRITER_FILE_SYSTEM_URI) ? WriterUtils.getWriterFs(state) : fs;
     this.closer = Closer.create();
     try {
-      FileSystem optionalRateControlledFs = fs;
+      FileSystem optionalRateControlledFs = targetFs;
       if (props.contains(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)) {
-        optionalRateControlledFs = this.closer.register(new RateControlledFileSystem(fs,
+        optionalRateControlledFs = this.closer.register(new RateControlledFileSystem(targetFs,
             Long.parseLong(props.getProperty(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT))));
         ((RateControlledFileSystem) optionalRateControlledFs).startRateControl();
       }
@@ -152,7 +157,6 @@ public class DatasetCleaner implements Instrumentable, Closeable {
           LOG.info("Successfully cleaned: " + dataset.datasetURN());
           Instrumented.markMeter(DatasetCleaner.this.datasetsCleanSuccessMeter);
         }
-
       });
     }
   }
@@ -196,8 +200,8 @@ public class DatasetCleaner implements Instrumentable, Closeable {
 
   @Override
   public void switchMetricContext(List<Tag<?>> tags) {
-    this.metricContext = this.closer
-        .register(Instrumented.newContextFromReferenceContext(this.metricContext, tags, Optional.<String> absent()));
+    this.metricContext = this.closer.register(
+        Instrumented.newContextFromReferenceContext(this.metricContext, tags, Optional.<String>absent()));
     this.regenerateMetrics();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f43de8c4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java
index c4a18cc..fbb6fe5 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java
@@ -54,7 +54,7 @@ public class DatasetCleanerJob extends AbstractJob implements Tool {
   @Override
   public void run() throws Exception {
     if (this.datasetCleaner != null) {
-        this.datasetCleaner.clean();
+      this.datasetCleaner.clean();
     }
   }