You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/18 04:52:53 UTC
incubator-gobblin git commit: [GOBBLIN-586] Added enhancement to
apply retention on remote HDFS
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 8e974ef09 -> f43de8c4d
[GOBBLIN-586] Added enhancement to apply retention on remote HDFS
Closes #2452 from amarnathkarthik/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f43de8c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f43de8c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f43de8c4
Branch: refs/heads/master
Commit: f43de8c4d7dd0fa3521edc03a173f25873ba9814
Parents: 8e974ef
Author: Karthik Amarnath <ka...@linkedin.com>
Authored: Mon Sep 17 21:52:41 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Sep 17 21:52:41 2018 -0700
----------------------------------------------------------------------
.../data/management/retention/DatasetCleaner.java | 14 +++++++++-----
.../data/management/retention/DatasetCleanerJob.java | 2 +-
2 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f43de8c4/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index d0a0a96..6793f5a 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -42,6 +42,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.gobblin.util.AzkabanTags;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.retention.dataset.CleanableDataset;
import org.apache.gobblin.data.management.retention.profile.MultiCleanableDatasetFinder;
@@ -56,6 +57,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.RateControlledFileSystem;
import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.util.WriterUtils;
/**
@@ -86,11 +88,14 @@ public class DatasetCleaner implements Instrumentable, Closeable {
public DatasetCleaner(FileSystem fs, Properties props) throws IOException {
+ State state = new State(props);
+ FileSystem targetFs =
+ props.containsKey(ConfigurationKeys.WRITER_FILE_SYSTEM_URI) ? WriterUtils.getWriterFs(state) : fs;
this.closer = Closer.create();
try {
- FileSystem optionalRateControlledFs = fs;
+ FileSystem optionalRateControlledFs = targetFs;
if (props.contains(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)) {
- optionalRateControlledFs = this.closer.register(new RateControlledFileSystem(fs,
+ optionalRateControlledFs = this.closer.register(new RateControlledFileSystem(targetFs,
Long.parseLong(props.getProperty(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT))));
((RateControlledFileSystem) optionalRateControlledFs).startRateControl();
}
@@ -152,7 +157,6 @@ public class DatasetCleaner implements Instrumentable, Closeable {
LOG.info("Successfully cleaned: " + dataset.datasetURN());
Instrumented.markMeter(DatasetCleaner.this.datasetsCleanSuccessMeter);
}
-
});
}
}
@@ -196,8 +200,8 @@ public class DatasetCleaner implements Instrumentable, Closeable {
@Override
public void switchMetricContext(List<Tag<?>> tags) {
- this.metricContext = this.closer
- .register(Instrumented.newContextFromReferenceContext(this.metricContext, tags, Optional.<String> absent()));
+ this.metricContext = this.closer.register(
+ Instrumented.newContextFromReferenceContext(this.metricContext, tags, Optional.<String>absent()));
this.regenerateMetrics();
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f43de8c4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java
index c4a18cc..fbb6fe5 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java
@@ -54,7 +54,7 @@ public class DatasetCleanerJob extends AbstractJob implements Tool {
@Override
public void run() throws Exception {
if (this.datasetCleaner != null) {
- this.datasetCleaner.clean();
+ this.datasetCleaner.clean();
}
}