You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ku...@apache.org on 2020/03/13 21:02:38 UTC
[incubator-gobblin] branch master updated:
[GOBBLIN-1085][Gobblin-1085] fix compaction initialization
This is an automated email from the ASF dual-hosted git repository.
kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ab00f44 [GOBBLIN-1085][Gobblin-1085] fix compaction initialization
ab00f44 is described below
commit ab00f44a50e7fc8b30953dd3d39057fc480d6871
Author: Arjun <ab...@linkedin.com>
AuthorDate: Fri Mar 13 14:02:26 2020 -0700
[GOBBLIN-1085][Gobblin-1085] fix compaction initialization
fix compaction initialization
address review comments
Closes #2926 from arjun4084346/compactionFix
---
.../compaction/source/CompactionSource.java | 54 ++++++++++++----------
1 file changed, 29 insertions(+), 25 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 149bdd0..6b7f551 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -31,6 +31,22 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.suite.CompactionSuite;
@@ -70,21 +86,6 @@ import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
import org.apache.gobblin.util.request_allocation.ResourceEstimator;
import org.apache.gobblin.util.request_allocation.ResourcePool;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.DateTimeUtils;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import static org.apache.gobblin.util.HadoopUtils.getSourceFileSystem;
@@ -110,19 +111,23 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
@Override
public WorkUnitStream getWorkunitStream(SourceState state) {
try {
- initCompactionSource(state);
-
- DatasetsFinder finder = DatasetUtils.instantiateDatasetFinder(state.getProperties(),
- getSourceFileSystem(state),
- DefaultFileSystemGlobFinder.class.getName());
+ fs = getSourceFileSystem(state);
+ DatasetsFinder<Dataset> finder = DatasetUtils.instantiateDatasetFinder(state.getProperties(),
+ fs, DefaultFileSystemGlobFinder.class.getName());
List<Dataset> datasets = finder.findDatasets();
- CompactionWorkUnitIterator workUnitIterator = new CompactionWorkUnitIterator ();
+ CompactionWorkUnitIterator workUnitIterator = new CompactionWorkUnitIterator();
- // Spawn a single thread to create work units
- new Thread(new SingleWorkUnitGeneratorService (state, prioritize(datasets, state), workUnitIterator), "SingleWorkUnitGeneratorService").start();
- return new BasicWorkUnitStream.Builder (workUnitIterator).build();
+ if (datasets.size() == 0) {
+ return new BasicWorkUnitStream.Builder(workUnitIterator).build();
+ }
+
+ // initialize iff datasets are found
+ initCompactionSource(state);
+ // Spawn a single thread to create work units
+ new Thread(new SingleWorkUnitGeneratorService(state, prioritize(datasets, state), workUnitIterator), "SingleWorkUnitGeneratorService").start();
+ return new BasicWorkUnitStream.Builder(workUnitIterator).build();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -222,7 +227,6 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
* happening inside {@link #initCompactionSource(SourceState)} is compulsory.
*/
private void initCompactionSource(SourceState state) throws IOException {
- fs = getSourceFileSystem(state);
state.setProp(COMPACTION_INIT_TIME, DateTimeUtils.currentTimeMillis());
suite = CompactionSuiteUtils.getCompactionSuiteFactory(state).createSuite(state);