You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/15 17:10:10 UTC

[gobblin] branch master updated: [GOBBLIN-1472] toggle to control compaction MR output dir

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e0ad35f  [GOBBLIN-1472] toggle to control compaction MR output dir
e0ad35f is described below

commit e0ad35f62a938d984051fb6d8d43829774395299
Author: vbohra <vb...@linkedin.com>
AuthorDate: Tue Jun 15 10:10:00 2021 -0700

    [GOBBLIN-1472] toggle to control compaction MR output dir
    
    Closes #3311 from vikrambohra/compactionAzureFix
---
 .../java/org/apache/gobblin/configuration/ConfigurationKeys.java     | 5 +++--
 .../gobblin/compaction/mapreduce/CompactionJobConfigurator.java      | 5 +++++
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index ed4ff00..851da65 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1074,7 +1074,8 @@ public class ConfigurationKeys {
   public static final String USE_DATASET_LOCAL_WORK_DIR = "gobblin.useDatasetLocalWorkDir";
   public static final String DESTINATION_DATASET_HANDLER_CLASS = "gobblin.destination.datasetHandlerClass";
   public static final String DATASET_DESTINATION_PATH = "gobblin.dataset.destination.path";
-  public static final String STAGING_DIR_DEFAULT_SUFFIX = "/.temp/taskStaging";
-  public static final String OUTPUT_DIR_DEFAULT_SUFFIX = "/.temp/taskOutput";
+  public static final String TMP_DIR = ".temp";
+  public static final String STAGING_DIR_DEFAULT_SUFFIX = "/" + TMP_DIR + "/taskStaging";
+  public static final String OUTPUT_DIR_DEFAULT_SUFFIX = "/" + TMP_DIR + "/taskOutput";
   public static final String ROW_LEVEL_ERR_FILE_DEFAULT_SUFFIX = "/err";
 }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
index 8e492c3..d57c5de 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
@@ -249,6 +249,11 @@ public abstract class CompactionJobConfigurator {
     CompactionPathParser.CompactionParserResult rst = parser.parse(dataset);
     this.mrOutputPath = concatPaths(mrOutputBase, rst.getDatasetName(), rst.getDstSubDir(), rst.getTimeString());
 
+    if(this.state.contains(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR)) {
+      mrOutputBase = this.state.getProp(MRCompactor.COMPACTION_DEST_DIR);
+      this.mrOutputPath = concatPaths(mrOutputBase, rst.getDatasetName(),
+          ConfigurationKeys.TMP_DIR, rst.getDstSubDir(), rst.getTimeString());
+    }
     log.info("Cleaning temporary MR output directory: " + mrOutputPath);
     this.fs.delete(mrOutputPath, true);