You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/27 18:32:36 UTC
[1/2] incubator-gobblin git commit: [GOBBLIN-448] Add glob pattern
blacklist in ConfigurableGlobDatasetFinder
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 54bda2736 -> 5d0e944c3
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
index 6ade09e..182244a 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
@@ -24,8 +24,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Lists;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -38,14 +36,15 @@ import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.FileListUtils;
-
/**
* This class creates the following properties for a single MapReduce job for compaction:
* compaction.topic, compaction.job.input.dir, compaction.job.dest.dir, compaction.job.dest.dir.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 8a0599e..0ab3eab 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.compaction.mapreduce;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -29,7 +28,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.math3.primes.Primes;
@@ -42,7 +40,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
index 5fcff75..7021a9b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
@@ -23,7 +23,6 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyRecordReader;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -31,10 +30,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.gobblin.util.AvroUtils;
-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.gobblin.util.AvroUtils;
+
/**
* A subclass of {@link org.apache.avro.mapreduce.AvroKeyRecordReader}. The purpose is to add a constructor
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
index 93d4ed6..5f864cb 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
@@ -17,21 +17,12 @@
package org.apache.gobblin.compaction.mapreduce.avro;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -47,9 +38,6 @@ import org.apache.hadoop.util.VersionInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.gobblin.util.AvroUtils;
-import org.apache.gobblin.util.FileListUtils;
-
/**
* A subclass of {@link org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat} for Avro inputfiles.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
index 8e508e5..5f2e9fd 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
@@ -27,16 +27,15 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ObjectNode;
-
import lombok.extern.slf4j.Slf4j;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
index 2fc6c58..f5cac79 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
@@ -17,22 +17,22 @@
package org.apache.gobblin.compaction.parser;
-import com.google.common.base.Joiner;
-import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.AllArgsConstructor;
import org.apache.commons.lang.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index f11378f..4e4382b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -16,9 +16,28 @@
*/
package org.apache.gobblin.compaction.source;
+
+import java.io.IOException;
+import java.net.URI;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
import com.google.common.base.Function;
import com.google.common.base.Optional;
@@ -27,19 +46,23 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.data.management.dataset.DatasetUtils;
-import org.apache.gobblin.data.management.dataset.DefaultFileSystemGlobFinder;
import org.apache.gobblin.compaction.suite.CompactionSuite;
+import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
+import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.gobblin.data.management.dataset.DefaultFileSystemGlobFinder;
+import org.apache.gobblin.data.management.dataset.SimpleDatasetRequest;
+import org.apache.gobblin.data.management.dataset.SimpleDatasetRequestor;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.DatasetsFinder;
import org.apache.gobblin.runtime.JobState;
@@ -63,31 +86,9 @@ import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
import org.apache.gobblin.util.request_allocation.RequestAllocator;
import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
-import org.apache.gobblin.data.management.dataset.SimpleDatasetRequest;
-import org.apache.gobblin.data.management.dataset.SimpleDatasetRequestor;
import org.apache.gobblin.util.request_allocation.ResourceEstimator;
import org.apache.gobblin.util.request_allocation.ResourcePool;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.DateTimeUtils;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* A compaction source derived from {@link Source} which uses {@link DefaultFileSystemGlobFinder} to find all
* {@link Dataset}s. Use {@link CompactionSuite#getDatasetsFinderVerifiers()} to guarantee a given dataset has passed
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
index 0142f6b..7b62671 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
@@ -17,10 +17,20 @@
package org.apache.gobblin.compaction.suite;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.action.CompactionCompleteAction;
import org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction;
-import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction;
+import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
@@ -28,14 +38,6 @@ import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.ArrayList;
/**
* A type of {@link CompactionSuite} which implements all components needed for avro file compaction.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
index 3c36ba5..1c564a6 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
@@ -17,19 +17,19 @@
package org.apache.gobblin.compaction.suite;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Job;
+
import org.apache.gobblin.compaction.action.CompactionCompleteAction;
import org.apache.gobblin.compaction.mapreduce.MRCompactionTask;
+import org.apache.gobblin.compaction.verify.CompactionVerifier;
import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.replication.ConfigBasedDatasetsFinder;
import org.apache.gobblin.dataset.Dataset;
-import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.configuration.State;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-import java.util.List;
-
/**
* This interface provides major components required by {@link org.apache.gobblin.compaction.source.CompactionSource}
* and {@link org.apache.gobblin.compaction.mapreduce.MRCompactionTask} flow.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index ebfe0e6..5653281 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -17,9 +17,18 @@
package org.apache.gobblin.compaction.verify;
-import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.joda.time.DateTime;
+
+import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.audit.AuditCountClient;
import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -27,12 +36,6 @@ import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.ClassAliasResolver;
-import lombok.extern.slf4j.Slf4j;
-import org.joda.time.DateTime;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
/**
* Use {@link AuditCountClient} to retrieve all record count across different tiers
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 67bb63a..a2751b9 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -17,21 +17,22 @@
package org.apache.gobblin.compaction.verify;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.Path;
import com.google.common.collect.Lists;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.Map;
/**
* Compare the source and destination avro records. Determine if a compaction is needed.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index a267ab5..06abd0a 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -18,19 +18,21 @@
package org.apache.gobblin.compaction.verify;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.Period;
-import org.joda.time.format.PeriodFormatter;
-import org.joda.time.format.PeriodFormatterBuilder;
/**
* A simple class which verify current dataset belongs to a specific time range. Will skip to do
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
index 25573f6..68f57a7 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
@@ -16,11 +16,11 @@
*/
package org.apache.gobblin.compaction.verify;
-import org.apache.gobblin.dataset.Dataset;
-
import lombok.AllArgsConstructor;
import lombok.Getter;
+import org.apache.gobblin.dataset.Dataset;
+
/**
* An interface which represents a generic verifier for compaction
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
index de95255..e1bc952 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
@@ -16,8 +16,24 @@
*/
package org.apache.gobblin.compaction.verify;
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -26,19 +42,6 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.RecordCountProvider;
import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.Collection;
/**
* A class helps to calculate, serialize, deserialize record count.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
index 3fed3e5..51fe866 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
@@ -17,20 +17,10 @@
package org.apache.gobblin.compaction.mapreduce;
-import com.google.common.io.Files;
-import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
-import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
-import org.apache.gobblin.compaction.source.CompactionSource;
-import org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
-import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories;
-import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
-import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
-import org.apache.gobblin.runtime.api.JobExecutionResult;
-import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
@@ -43,13 +33,23 @@ import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
+import com.google.common.io.Files;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
+import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories;
+import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
+import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
+import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+
@Slf4j
public class MRCompactionTaskTest {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
index 517b609..b677cd3 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
@@ -15,23 +15,23 @@
* limitations under the License.
*/
package org.apache.gobblin.compaction.mapreduce;
-import org.apache.gobblin.compaction.dataset.Dataset;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
+import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import org.testng.Assert;
+import org.apache.gobblin.compaction.dataset.Dataset;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
index 150eee0..b243a8e 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.compaction.mapreduce.avro;
import java.util.List;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.testng.Assert;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
index 8211710..3d51218 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
@@ -20,26 +20,25 @@ package org.apache.gobblin.compaction.mapreduce.conditions;
import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
-import org.joda.time.format.PeriodFormatterBuilder;
import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-
-
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
import com.google.common.base.Optional;
-import org.apache.gobblin.compaction.conditions.RecompactionCondition;
+import com.google.common.collect.Lists;
+
import org.apache.gobblin.compaction.conditions.RecompactionCombineCondition;
+import org.apache.gobblin.compaction.conditions.RecompactionCondition;
import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnDuration;
import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnFileCount;
import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
index f876811..f9c855a 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
@@ -16,19 +16,23 @@
*/
package org.apache.gobblin.compaction.verify;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.Setter;
+
import org.apache.gobblin.compaction.audit.AuditCountClient;
import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.Map;
/**
* Class to test audit count verification logic
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
index df27f9f..3491013 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
@@ -23,12 +23,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@@ -36,6 +30,13 @@ import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.gobblin.data.management.retention.DatasetCleaner;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.DatasetsFinder;
@@ -62,9 +63,11 @@ public abstract class ConfigurableGlobDatasetFinder<T extends Dataset> implement
public static final String DATASET_FINDER_PATTERN_KEY = CONFIGURATION_KEY_PREFIX + "dataset.pattern";
public static final String DATASET_FINDER_BLACKLIST_KEY = CONFIGURATION_KEY_PREFIX + "dataset.blacklist";
+ public static final String DATASET_FINDER_GLOB_BLACKLIST_KEY = CONFIGURATION_KEY_PREFIX + "dataset.glob.blacklist";
protected final Path datasetPattern;
private final Optional<Pattern> blacklist;
+ private final Optional<Pattern> globPatternBlacklist;
private final Path commonRoot;
protected final FileSystem fs;
protected final Properties props;
@@ -86,6 +89,12 @@ public abstract class ConfigurableGlobDatasetFinder<T extends Dataset> implement
this.blacklist = Optional.absent();
}
+ if (ConfigUtils.hasNonEmptyPath(config, DATASET_FINDER_GLOB_BLACKLIST_KEY)) {
+ this.globPatternBlacklist = Optional.of(GlobPattern.compile(config.getString(DATASET_FINDER_GLOB_BLACKLIST_KEY)));
+ } else {
+ this.globPatternBlacklist = Optional.absent();
+ }
+
this.fs = fs;
Path tmpDatasetPattern;
@@ -132,6 +141,9 @@ public abstract class ConfigurableGlobDatasetFinder<T extends Dataset> implement
if (this.blacklist.isPresent() && this.blacklist.get().matcher(pathToMatch.toString()).find()) {
continue;
}
+ if (this.globPatternBlacklist.isPresent() && this.globPatternBlacklist.get().matcher(pathToMatch.toString()).find()) {
+ continue;
+ }
LOG.info("Found dataset at " + fileStatus.getPath());
datasets.add(datasetAtPath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath())));
}
[2/2] incubator-gobblin git commit: [GOBBLIN-448] Add glob pattern
blacklist in ConfigurableGlobDatasetFinder
Posted by hu...@apache.org.
[GOBBLIN-448] Add glob pattern blacklist in ConfigurableGlobDatasetFinder
Closes #2322 from yukuai518/blacklist
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5d0e944c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5d0e944c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5d0e944c
Branch: refs/heads/master
Commit: 5d0e944c3facc9814d16fa7588f958f1e763a468
Parents: 54bda27
Author: Kuai Yu <ku...@linkedin.com>
Authored: Tue Mar 27 11:32:30 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Mar 27 11:32:30 2018 -0700
----------------------------------------------------------------------
.../cluster/ClusterEventMetadataGenerator.java | 1 -
.../gobblin/cluster/GobblinClusterManager.java | 21 +++----
.../gobblin/cluster/GobblinClusterUtils.java | 11 ++--
.../apache/gobblin/cluster/GobblinHelixJob.java | 7 +--
.../cluster/GobblinHelixJobLauncher.java | 8 +--
.../cluster/GobblinHelixJobScheduler.java | 19 +++----
.../cluster/GobblinHelixMessagingService.java | 10 ++--
.../gobblin/cluster/GobblinHelixTask.java | 1 -
.../cluster/GobblinHelixTaskStateTracker.java | 2 -
.../gobblin/cluster/GobblinTaskRunner.java | 14 ++---
.../cluster/GobblinTaskRunnerMetrics.java | 9 ---
.../gobblin/cluster/GobblinTaskStateModel.java | 1 -
.../cluster/GobblinTaskStateModelFactory.java | 1 -
.../cluster/JobConfigurationManager.java | 3 +-
.../ScheduledJobConfigurationManager.java | 13 +++--
.../gobblin/cluster/SingleTaskLauncher.java | 4 +-
.../StreamingJobConfigurationManager.java | 13 +++--
.../gobblin/cluster/GobblinClusterKillTest.java | 15 +++--
.../cluster/GobblinClusterUtilsTest.java | 14 +++--
.../cluster/GobblinHelixJobLauncherTest.java | 12 ++--
.../gobblin/cluster/GobblinHelixTaskTest.java | 8 +--
.../apache/gobblin/cluster/HelixUtilsTest.java | 1 -
.../cluster/JobConfigurationManagerTest.java | 1 +
.../gobblin/cluster/SingleTaskLauncherTest.java | 4 +-
.../org/apache/gobblin/cluster/TestHelper.java | 2 -
.../TestShutdownMessageHandlerFactory.java | 1 -
.../compaction/ReflectionCompactorFactory.java | 4 +-
.../action/CompactionCompleteAction.java | 5 +-
.../CompactionCompleteFileOperationAction.java | 30 +++++-----
.../CompactionHiveRegistrationAction.java | 23 ++++----
.../action/CompactionMarkDirectoryAction.java | 26 ++++-----
.../audit/KafkaAuditCountHttpClient.java | 26 +++++----
.../audit/PinotAuditCountHttpClient.java | 26 +++++----
.../RecompactionCombineCondition.java | 4 +-
.../RecompactionConditionBasedOnDuration.java | 11 ++--
.../RecompactionConditionBasedOnFileCount.java | 3 +-
.../RecompactionConditionBasedOnRatio.java | 13 ++---
.../gobblin/compaction/dataset/Dataset.java | 8 +--
.../compaction/dataset/DatasetHelper.java | 4 +-
.../compaction/dataset/DatasetsFinder.java | 4 +-
.../dataset/TimeBasedSubDirDatasetsFinder.java | 19 ++++---
.../HiveRegistrationCompactorListener.java | 2 +-
.../CompactionLauncherWriter.java | 7 ++-
.../CompactionLauncherWriterBuilder.java | 2 +
.../HiveMetadataForCompactionExtractor.java | 8 ++-
...veMetadataForCompactionExtractorFactory.java | 8 ++-
.../hivebasedconstructs/MRCompactionEntity.java | 2 +
.../ReflectionCompactorListenerFactory.java | 4 +-
.../SimpleCompactorCompletionListener.java | 3 +-
.../CompactionAvroJobConfigurator.java | 60 +++++++++++---------
.../compaction/mapreduce/MRCompactionTask.java | 16 ++++--
.../mapreduce/MRCompactionTaskFactory.java | 1 -
.../compaction/mapreduce/MRCompactor.java | 26 ++++-----
.../mapreduce/MRCompactorJobPropCreator.java | 5 +-
.../mapreduce/MRCompactorJobRunner.java | 3 -
.../avro/AvroKeyCombineFileRecordReader.java | 5 +-
.../AvroKeyRecursiveCombineFileInputFormat.java | 12 ----
.../FieldAttributeBasedDeltaFieldsProvider.java | 7 +--
.../compaction/parser/CompactionPathParser.java | 6 +-
.../compaction/source/CompactionSource.java | 57 ++++++++++---------
.../compaction/suite/CompactionAvroSuite.java | 20 ++++---
.../compaction/suite/CompactionSuite.java | 14 ++---
.../verify/CompactionAuditCountVerifier.java | 17 +++---
.../verify/CompactionThresholdVerifier.java | 15 ++---
.../verify/CompactionTimeRangeVerifier.java | 16 +++---
.../compaction/verify/CompactionVerifier.java | 4 +-
.../verify/InputRecordCountHelper.java | 29 +++++-----
.../mapreduce/MRCompactionTaskTest.java | 36 ++++++------
.../mapreduce/RenameSourceDirectoryTest.java | 18 +++---
.../avro/ConfBasedDeltaFieldProviderTest.java | 1 -
.../conditions/RecompactionConditionTest.java | 11 ++--
.../verify/PinotAuditCountVerifierTest.java | 16 ++++--
.../profile/ConfigurableGlobDatasetFinder.java | 24 ++++++--
73 files changed, 435 insertions(+), 422 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
index 6aeb89c..842e285 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
@@ -23,7 +23,6 @@ import java.util.Map;
import com.google.common.collect.ImmutableMap;
import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventName;
import org.apache.gobblin.runtime.EventMetadataUtils;
import org.apache.gobblin.runtime.JobContext;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index d57c61e..1592cf8 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -36,13 +35,6 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
-import org.apache.gobblin.metrics.ContextAwareMetric;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -67,8 +59,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -80,9 +72,17 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.app.ApplicationException;
@@ -93,9 +93,6 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import javax.annotation.Nonnull;
-import lombok.Getter;
-
/**
* The central cluster manager for Gobblin Clusters.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 6b6ead8..41f926e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -17,19 +17,20 @@
package org.apache.gobblin.cluster;
-import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
-
import java.net.InetAddress;
import java.net.UnknownHostException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import lombok.extern.slf4j.Slf4j;
+import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
@Alpha
@Slf4j
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
index bc9f88f..db34e3e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
@@ -20,20 +20,19 @@ package org.apache.gobblin.cluster;
import java.util.Properties;
import java.util.concurrent.Future;
-import lombok.extern.slf4j.Slf4j;
-
-
import org.quartz.InterruptableJob;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.scheduler.BaseGobblinJob;
import org.apache.gobblin.scheduler.JobScheduler;
-import org.quartz.UnableToInterruptJobException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index d502462..5035216 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -25,8 +25,6 @@ import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -47,6 +45,8 @@ import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
+import javax.annotation.Nullable;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.StateStore;
@@ -55,11 +55,13 @@ import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.rest.LauncherTypeEnum;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.ExecutionModel;
+import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
@@ -69,8 +71,6 @@ import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.SerializationUtils;
-import javax.annotation.Nullable;
-
/**
* An implementation of {@link JobLauncher} that launches a Gobblin job using the Helix task framework.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index ef12162..e539273 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -30,8 +30,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PropertiesUtils;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
@@ -42,31 +40,32 @@ import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.api.JobExecutionLauncher;
-import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobLauncher;
-import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.listeners.AbstractJobListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
-
-
-import javax.annotation.Nonnull;
-import lombok.Getter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
index cdced1b..4101f17 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
@@ -16,29 +16,29 @@
*/
package org.apache.gobblin.cluster;
-import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import java.util.UUID;
import java.util.regex.Pattern;
+
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
import org.apache.helix.messaging.CriteriaEvaluator;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.messaging.ZNRecordRow;
-import org.apache.helix.PropertyKey;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index c6b9514..e651b8e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -35,7 +35,6 @@ import com.google.common.io.Closer;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
index 79df1fa..7120bef 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
@@ -22,8 +22,6 @@ import java.util.Properties;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
-import org.apache.helix.HelixManager;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 3ec40dc..e68774d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -36,11 +36,6 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -74,9 +69,15 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.services.JMXReportingService;
@@ -86,9 +87,6 @@ import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.PathUtils;
-import javax.annotation.Nonnull;
-import lombok.Getter;
-
import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
index 51e8b36..6435ff4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
@@ -17,17 +17,8 @@
package org.apache.gobblin.cluster;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import com.codahale.metrics.Metric;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.TaskExecutor;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java
index 5a96e48..cc4a215 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java
@@ -25,7 +25,6 @@ import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskStateModel;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.TaskExecutor;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java
index 335a1e0..3603571 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java
@@ -26,7 +26,6 @@ import org.apache.helix.task.TaskStateModel;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.TaskExecutor;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
index 42fab27..d60540c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.util.List;
import java.util.Properties;
-import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +29,8 @@ import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
+import javax.annotation.Nullable;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
index 0f2d356..00c7b0d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
@@ -27,23 +27,24 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-import org.apache.gobblin.runtime.api.SpecExecutor;
@Alpha
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
index 8cbbc00..3a0c780 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
@@ -27,11 +27,11 @@ import org.apache.commons.lang3.text.StrTokenizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.typesafe.config.Config;
+
import org.apache.gobblin.util.GobblinProcessBuilder;
import org.apache.gobblin.util.SystemPropertiesWrapper;
-import com.typesafe.config.Config;
-
import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.CLUSTER_CONFIG_FILE_PATH;
import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.JOB_ID;
import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.WORK_UNIT_FILE_PATH;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
index 3c01704..b6985a1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
@@ -25,27 +25,28 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-
-import lombok.Getter;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
index ad19135..11b8808 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
@@ -17,11 +17,6 @@
package org.apache.gobblin.cluster;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -31,9 +26,9 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
+
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
-import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
@@ -43,6 +38,14 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
+
/**
* Unit tests for killing {@link GobblinClusterManager}s and {@link GobblinTaskRunner}s
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
index 4d83658..9288d42 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
@@ -17,18 +17,20 @@
package org.apache.gobblin.cluster;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.annotations.Test;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
public class GobblinClusterUtilsTest {
FileSystem fs = mock(FileSystem.class);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 10ef3db..ef5fc4f 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -29,10 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.curator.test.TestingServer;
-import org.apache.gobblin.metastore.DatasetStateStore;
-import org.apache.gobblin.runtime.JobContext;
-import org.apache.gobblin.runtime.listeners.AbstractJobListener;
-import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -56,16 +52,20 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import lombok.Getter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.FsDatasetStateStore;
+import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.listeners.AbstractJobListener;
+import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
-import lombok.Getter;
-
/**
* Unit tests for {@link GobblinHelixJobLauncher}.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index 1cbd72e..a104b7d 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -17,26 +17,20 @@
package org.apache.gobblin.cluster;
-import com.typesafe.config.ConfigFactory;
-import org.apache.gobblin.metastore.FsStateStore;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
import org.apache.helix.HelixManager;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskResult;
-
import org.mockito.Mockito;
-
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -44,10 +38,12 @@ import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.example.simplejson.SimpleJsonConverter;
import org.apache.gobblin.example.simplejson.SimpleJsonSource;
+import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskExecutor;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
index 2afd01f..d7841a8 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
-
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
index 73098f2..9e36d8c 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
@@ -39,6 +39,7 @@ import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+
import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
import org.apache.gobblin.configuration.ConfigurationKeys;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
index afa933d..3c56428 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
@@ -25,11 +25,11 @@ import java.util.List;
import org.testng.annotations.Test;
+import com.typesafe.config.ConfigFactory;
+
import org.apache.gobblin.util.GobblinProcessBuilder;
import org.apache.gobblin.util.SystemPropertiesWrapper;
-import com.typesafe.config.ConfigFactory;
-
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java
index b717585..e8b32ab 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java
@@ -26,12 +26,10 @@ import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
-
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
-
import org.testng.Assert;
import com.google.common.io.Closer;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
index c0b80a9..cd805bf 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
@@ -25,7 +25,6 @@ import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
-
import org.testng.Assert;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
index fc68e47..1ba6fac 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
@@ -20,11 +20,11 @@ package org.apache.gobblin.compaction;
import java.util.List;
import java.util.Properties;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-
import org.apache.gobblin.compaction.listeners.CompactorListener;
import org.apache.gobblin.metrics.Tag;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java
index a5b21bc..8c5979c 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java
@@ -16,11 +16,12 @@
*/
package org.apache.gobblin.compaction.action;
-import org.apache.gobblin.dataset.Dataset;
-import org.apache.gobblin.metrics.event.EventSubmitter;
import java.io.IOException;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
/**
* An interface which represents an action that is invoked after a compaction job is finished.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index a21ca93..831443b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -17,9 +17,21 @@
package org.apache.gobblin.compaction.action;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -33,20 +45,6 @@ import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WriterUtils;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index 7792a50..a7536d3 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -17,6 +17,18 @@
package org.apache.gobblin.compaction.action;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -29,17 +41,6 @@ import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.metrics.event.EventSubmitter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-
/**
* Class responsible for hive registration after compaction is complete
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
index b504996..89be94a 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
@@ -17,6 +17,19 @@
package org.apache.gobblin.compaction.action;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -25,19 +38,6 @@ import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-
@Slf4j
@AllArgsConstructor
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
index 7f6fb68..04988fe 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
@@ -17,15 +17,11 @@
package org.apache.gobblin.compaction.audit;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.gobblin.configuration.State;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.ThreadSafe;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -35,10 +31,16 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
/**
* A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java
index 0e06606..5889edd 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java
@@ -17,15 +17,11 @@
package org.apache.gobblin.compaction.audit;
-import com.google.api.client.util.Charsets;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.gobblin.configuration.State;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.Map;
-import javax.annotation.concurrent.ThreadSafe;
-import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -35,10 +31,16 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
-import java.io.IOException;
-import java.net.URLEncoder;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.api.client.util.Charsets;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
/**
* A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java
index 811be5b..a7731f6 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java
@@ -17,17 +17,17 @@
package org.apache.gobblin.compaction.conditions;
-
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.collect.ImmutableList;
import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java
index c432ed2..3704fab 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java
@@ -17,12 +17,6 @@
package org.apache.gobblin.compaction.conditions;
-
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
-import org.apache.gobblin.compaction.dataset.Dataset;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.joda.time.format.PeriodFormatter;
@@ -32,6 +26,11 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java
index b8817fe..9e8e25b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java
@@ -16,13 +16,14 @@
*/
package org.apache.gobblin.compaction.conditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.compaction.dataset.Dataset;
import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.compaction.dataset.Dataset;
/**
* An implementation {@link RecompactionCondition} which examines the number of files in the late outputDir
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java
index d58a17d..e2bcdb4 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java
@@ -17,16 +17,9 @@
package org.apache.gobblin.compaction.conditions;
-
import java.util.List;
import java.util.Map;
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
-import org.apache.gobblin.compaction.dataset.Dataset;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.util.DatasetFilterUtils;
-
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +27,12 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.util.DatasetFilterUtils;
+
/**
* An implementation {@link RecompactionCondition} which examines the late record percentage.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java
index 73f12a9..36ec939 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java
@@ -22,10 +22,6 @@ import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
@@ -33,6 +29,10 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java
index 2eede65..ca05822 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java
@@ -26,14 +26,14 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.joda.time.DateTimeZone;
import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.google.common.base.Optional;
import org.apache.gobblin.compaction.conditions.RecompactionCondition;
import org.apache.gobblin.compaction.conditions.RecompactionConditionFactory;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java
index cd56460..6ed04af 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java
@@ -17,17 +17,14 @@
package org.apache.gobblin.compaction.dataset;
-import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +36,7 @@ import com.google.common.collect.Lists;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.HadoopUtils;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
index b00d7f4..111fe75 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
@@ -17,12 +17,9 @@
package org.apache.gobblin.compaction.dataset;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.DatasetFilterUtils;
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.util.Set;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -35,8 +32,14 @@ import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
-import java.io.IOException;
-import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.DatasetFilterUtils;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java
index 3eec1d5..d6b5d81 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java
@@ -19,8 +19,8 @@ package org.apache.gobblin.compaction.hive.registration;
import java.util.Properties;
-import org.apache.gobblin.compaction.listeners.CompactorListener;
import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.listeners.CompactorListener;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java
index c6a66c6..43c5fbc 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java
@@ -21,15 +21,18 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+
import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Joiner;
+
import org.apache.gobblin.compaction.listeners.CompactorListener;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.compaction.mapreduce.avro.ConfBasedDeltaFieldProvider;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.writer.DataWriter;
-import org.apache.gobblin.compaction.mapreduce.avro.ConfBasedDeltaFieldProvider;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java
index d108343..156cc54 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java
@@ -18,7 +18,9 @@
package org.apache.gobblin.compaction.hivebasedconstructs;
import java.io.IOException;
+
import org.apache.avro.Schema;
+
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.DataWriterBuilder;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
index c5b817e..020da2d 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
@@ -19,20 +19,24 @@ package org.apache.gobblin.compaction.hivebasedconstructs;
import java.io.IOException;
import java.util.List;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.thrift.TException;
+
import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor;
import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.util.AutoReturnableObject;
-import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor;
-import lombok.extern.slf4j.Slf4j;
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
index 27958b5..ab89df5 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
@@ -17,14 +17,16 @@
package org.apache.gobblin.compaction.hivebasedconstructs;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor;
-import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractorFactory;
import java.io.IOException;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.thrift.TException;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor;
+import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractorFactory;
+
/**
* Factory for {@link HiveMetadataForCompactionExtractor}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java
index 2d721d0..1f17955 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.compaction.hivebasedconstructs;
import java.util.List;
import java.util.Properties;
+
import org.apache.hadoop.fs.Path;
+
import lombok.Getter;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java
index e9a1fca..b780c2c 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java
@@ -21,12 +21,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-
import org.apache.gobblin.configuration.State;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java
index b4365bc..e82bb33 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java
@@ -18,12 +18,13 @@
package org.apache.gobblin.compaction.listeners;
import java.util.Set;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.configuration.State;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index b8f407f..c9b7708 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -17,25 +17,16 @@
package org.apache.gobblin.compaction.mapreduce;
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
-import org.apache.gobblin.compaction.mapreduce.avro.*;
-import org.apache.gobblin.compaction.parser.CompactionPathParser;
-import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.Dataset;
-import org.apache.gobblin.dataset.FileSystemDataset;
-import org.apache.gobblin.util.AvroUtils;
-import org.apache.gobblin.util.FileListUtils;
-import org.apache.gobblin.util.HadoopUtils;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
@@ -52,15 +43,30 @@ import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+import com.google.common.base.Enums;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyCompactorOutputFormat;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
+import org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.HadoopUtils;
/**
* A configurator that focused on creating avro compaction map-reduce job
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index be71804..78ed1c2 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -17,6 +17,16 @@
package org.apache.gobblin.compaction.mapreduce;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.ImmutableMap;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.compaction.action.CompactionCompleteAction;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.suite.CompactionSuite;
@@ -27,13 +37,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.mapreduce.MRTask;
-import java.util.List;
-import java.io.IOException;
-import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.mapreduce.Job;
-import com.google.common.collect.ImmutableMap;
/**
* Customized task of type {@link MRTask}, which runs MR job to compact dataset.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java
index ff8b012..9d5b749 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java
@@ -18,7 +18,6 @@ package org.apache.gobblin.compaction.mapreduce;
import java.io.IOException;
-
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.mapreduce.MRTaskFactory;
import org.apache.gobblin.runtime.task.TaskIFace;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index 9e53ef4..953c0dc 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -17,13 +17,6 @@
package org.apache.gobblin.compaction.mapreduce;
-import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.COMPACTION_COMPLETE;
-import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.GIVEN_UP;
-import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.UNVERIFIED;
-import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.VERIFIED;
-import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.ABORTED;
-import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.COMMITTED;
-
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -36,9 +29,7 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-import java.lang.reflect.InvocationTargetException;
-import org.joda.time.DateTime;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -66,13 +57,13 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.gobblin.compaction.Compactor;
-import org.apache.gobblin.compaction.listeners.CompactorCompletionListener;
-import org.apache.gobblin.compaction.listeners.CompactorCompletionListenerFactory;
-import org.apache.gobblin.compaction.listeners.CompactorListener;
import org.apache.gobblin.compaction.dataset.Dataset;
import org.apache.gobblin.compaction.dataset.DatasetsFinder;
import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
+import org.apache.gobblin.compaction.listeners.CompactorCompletionListener;
+import org.apache.gobblin.compaction.listeners.CompactorCompletionListenerFactory;
+import org.apache.gobblin.compaction.listeners.CompactorListener;
import org.apache.gobblin.compaction.verify.DataCompletenessVerifier;
import org.apache.gobblin.compaction.verify.DataCompletenessVerifier.Results;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -81,15 +72,22 @@ import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ClusterNameTags;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.ClusterNameTags;
import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.recordcount.CompactionRecordCountProvider;
import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.COMPACTION_COMPLETE;
+import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.GIVEN_UP;
+import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.UNVERIFIED;
+import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.VERIFIED;
+import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.ABORTED;
+import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.COMMITTED;
+
/**
* MapReduce-based {@link org.apache.gobblin.compaction.Compactor}. Compaction will run on each qualified {@link Dataset}
* under {@link #COMPACTION_INPUT_DIR}.