You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ku...@apache.org on 2020/03/13 21:02:38 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1085][Gobblin-1085] fix compaction initialization

This is an automated email from the ASF dual-hosted git repository.

kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ab00f44  [GOBBLIN-1085][Gobblin-1085] fix compaction initialization
ab00f44 is described below

commit ab00f44a50e7fc8b30953dd3d39057fc480d6871
Author: Arjun <ab...@linkedin.com>
AuthorDate: Fri Mar 13 14:02:26 2020 -0700

    [GOBBLIN-1085][Gobblin-1085] fix compaction initialization
    
    fix compaction initialization
    
    address review comments
    
    Closes #2926 from arjun4084346/compactionFix
---
 .../compaction/source/CompactionSource.java        | 54 ++++++++++++----------
 1 file changed, 29 insertions(+), 25 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 149bdd0..6b7f551 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -31,6 +31,22 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.suite.CompactionSuite;
@@ -70,21 +86,6 @@ import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
 import org.apache.gobblin.util.request_allocation.ResourceEstimator;
 import org.apache.gobblin.util.request_allocation.ResourcePool;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.DateTimeUtils;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.gobblin.util.HadoopUtils.getSourceFileSystem;
 
@@ -110,19 +111,23 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
   @Override
   public WorkUnitStream getWorkunitStream(SourceState state) {
     try {
-      initCompactionSource(state);
-
-      DatasetsFinder finder = DatasetUtils.instantiateDatasetFinder(state.getProperties(),
-              getSourceFileSystem(state),
-              DefaultFileSystemGlobFinder.class.getName());
+      fs = getSourceFileSystem(state);
+      DatasetsFinder<Dataset> finder = DatasetUtils.instantiateDatasetFinder(state.getProperties(),
+          fs, DefaultFileSystemGlobFinder.class.getName());
 
       List<Dataset> datasets = finder.findDatasets();
-      CompactionWorkUnitIterator workUnitIterator = new CompactionWorkUnitIterator ();
+      CompactionWorkUnitIterator workUnitIterator = new CompactionWorkUnitIterator();
 
-      // Spawn a single thread to create work units
-      new Thread(new SingleWorkUnitGeneratorService (state, prioritize(datasets, state), workUnitIterator), "SingleWorkUnitGeneratorService").start();
-      return new BasicWorkUnitStream.Builder (workUnitIterator).build();
+      if (datasets.size() == 0) {
+        return new BasicWorkUnitStream.Builder(workUnitIterator).build();
+      }
+
+      // initialize iff datasets are found
+      initCompactionSource(state);
 
+      // Spawn a single thread to create work units
+      new Thread(new SingleWorkUnitGeneratorService(state, prioritize(datasets, state), workUnitIterator), "SingleWorkUnitGeneratorService").start();
+      return new BasicWorkUnitStream.Builder(workUnitIterator).build();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -222,7 +227,6 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
    * happening inside {@link #initCompactionSource(SourceState)} is compulsory.
    */
   private void initCompactionSource(SourceState state) throws IOException {
-    fs = getSourceFileSystem(state);
     state.setProp(COMPACTION_INIT_TIME, DateTimeUtils.currentTimeMillis());
     suite = CompactionSuiteUtils.getCompactionSuiteFactory(state).createSuite(state);