You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/27 18:32:36 UTC

[1/2] incubator-gobblin git commit: [GOBBLIN-448] Add glob pattern blacklist in ConfigurableGlobDatasetFinder

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 54bda2736 -> 5d0e944c3


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
index 6ade09e..182244a 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
@@ -24,8 +24,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Lists;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,14 +36,15 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.FileListUtils;
 
-
 /**
  * This class creates the following properties for a single MapReduce job for compaction:
  * compaction.topic, compaction.job.input.dir, compaction.job.dest.dir, compaction.job.dest.dir.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 8a0599e..0ab3eab 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.compaction.mapreduce;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +28,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.math3.primes.Primes;
@@ -42,7 +40,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.joda.time.DateTime;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
index 5fcff75..7021a9b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
@@ -23,7 +23,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyRecordReader;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -31,10 +30,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
-import org.apache.gobblin.util.AvroUtils;
-
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
+import org.apache.gobblin.util.AvroUtils;
+
 
 /**
  * A subclass of {@link org.apache.avro.mapreduce.AvroKeyRecordReader}. The purpose is to add a constructor

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
index 93d4ed6..5f864cb 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
@@ -17,21 +17,12 @@
 
 package org.apache.gobblin.compaction.mapreduce.avro;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroKey;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -47,9 +38,6 @@ import org.apache.hadoop.util.VersionInfo;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-import org.apache.gobblin.util.AvroUtils;
-import org.apache.gobblin.util.FileListUtils;
-
 
 /**
  * A subclass of {@link org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat} for Avro inputfiles.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
index 8e508e5..5f2e9fd 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
@@ -27,16 +27,15 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ObjectNode;
-
 import lombok.extern.slf4j.Slf4j;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
index 2fc6c58..f5cac79 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
@@ -17,22 +17,22 @@
 
 package org.apache.gobblin.compaction.parser;
 
-import com.google.common.base.Joiner;
-import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.AllArgsConstructor;
 import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
 
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index f11378f..4e4382b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -16,9 +16,28 @@
  */
 
 package org.apache.gobblin.compaction.source;
+
+import java.io.IOException;
+import java.net.URI;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
@@ -27,19 +46,23 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.data.management.dataset.DatasetUtils;
-import org.apache.gobblin.data.management.dataset.DefaultFileSystemGlobFinder;
 import org.apache.gobblin.compaction.suite.CompactionSuite;
+import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
+import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.gobblin.data.management.dataset.DefaultFileSystemGlobFinder;
+import org.apache.gobblin.data.management.dataset.SimpleDatasetRequest;
+import org.apache.gobblin.data.management.dataset.SimpleDatasetRequestor;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.DatasetsFinder;
 import org.apache.gobblin.runtime.JobState;
@@ -63,31 +86,9 @@ import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
 import org.apache.gobblin.util.request_allocation.RequestAllocator;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
-import org.apache.gobblin.data.management.dataset.SimpleDatasetRequest;
-import org.apache.gobblin.data.management.dataset.SimpleDatasetRequestor;
 import org.apache.gobblin.util.request_allocation.ResourceEstimator;
 import org.apache.gobblin.util.request_allocation.ResourcePool;
 
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.DateTimeUtils;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * A compaction source derived from {@link Source} which uses {@link DefaultFileSystemGlobFinder} to find all
  * {@link Dataset}s. Use {@link CompactionSuite#getDatasetsFinderVerifiers()} to guarantee a given dataset has passed

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
index 0142f6b..7b62671 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
@@ -17,10 +17,20 @@
 
 package org.apache.gobblin.compaction.suite;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
 import org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction;
-import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
 import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction;
+import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
 import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
 import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
 import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
@@ -28,14 +38,6 @@ import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.ArrayList;
 
 /**
  * A type of {@link CompactionSuite} which implements all components needed for avro file compaction.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
index 3c36ba5..1c564a6 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
@@ -17,19 +17,19 @@
 
 package org.apache.gobblin.compaction.suite;
 
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Job;
+
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
 import org.apache.gobblin.compaction.mapreduce.MRCompactionTask;
+import org.apache.gobblin.compaction.verify.CompactionVerifier;
 import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.data.management.copy.replication.ConfigBasedDatasetsFinder;
 import org.apache.gobblin.dataset.Dataset;
 
-import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.configuration.State;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-import java.util.List;
-
 /**
  * This interface provides major components required by {@link org.apache.gobblin.compaction.source.CompactionSource}
  * and {@link org.apache.gobblin.compaction.mapreduce.MRCompactionTask} flow.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index ebfe0e6..5653281 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -17,9 +17,18 @@
 
 package org.apache.gobblin.compaction.verify;
 
-import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.joda.time.DateTime;
+
+import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.audit.AuditCountClient;
 import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -27,12 +36,6 @@ import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.util.ClassAliasResolver;
-import lombok.extern.slf4j.Slf4j;
-import org.joda.time.DateTime;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
 
 /**
  * Use {@link AuditCountClient} to retrieve all record count across different tiers

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 67bb63a..a2751b9 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -17,21 +17,22 @@
 
 package org.apache.gobblin.compaction.verify;
 
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Lists;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.Map;
 
 /**
  * Compare the source and destination avro records. Determine if a compaction is needed.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index a267ab5..06abd0a 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -18,19 +18,21 @@
 package org.apache.gobblin.compaction.verify;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.Period;
-import org.joda.time.format.PeriodFormatter;
-import org.joda.time.format.PeriodFormatterBuilder;
 
 /**
  * A simple class which verify current dataset belongs to a specific time range. Will skip to do

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
index 25573f6..68f57a7 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
@@ -16,11 +16,11 @@
  */
 package org.apache.gobblin.compaction.verify;
 
-import org.apache.gobblin.dataset.Dataset;
-
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 
+import org.apache.gobblin.dataset.Dataset;
+
 
 /**
  * An interface which represents a generic verifier for compaction

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
index de95255..e1bc952 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
@@ -16,8 +16,24 @@
  */
 package org.apache.gobblin.compaction.verify;
 
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -26,19 +42,6 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.RecordCountProvider;
 import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.Collection;
 
 /**
  * A class helps to calculate, serialize, deserialize record count.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
index 3fed3e5..51fe866 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
@@ -17,20 +17,10 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
-import com.google.common.io.Files;
-import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
-import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
-import org.apache.gobblin.compaction.source.CompactionSource;
-import org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
-import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories;
-import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
-import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
-import org.apache.gobblin.runtime.api.JobExecutionResult;
-import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
@@ -43,13 +33,23 @@ import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
+import com.google.common.io.Files;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
+import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories;
+import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
+import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
+import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+
 
 @Slf4j
 public class MRCompactionTaskTest {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
index 517b609..b677cd3 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
@@ -15,23 +15,23 @@
  * limitations under the License.
  */
 package org.apache.gobblin.compaction.mapreduce;
-import org.apache.gobblin.compaction.dataset.Dataset;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import org.testng.Assert;
 
+import org.apache.gobblin.compaction.dataset.Dataset;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
index 150eee0..b243a8e 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.compaction.mapreduce.avro;
 
 import java.util.List;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.testng.Assert;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
index 8211710..3d51218 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
@@ -20,26 +20,25 @@ package org.apache.gobblin.compaction.mapreduce.conditions;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
-import org.joda.time.format.PeriodFormatterBuilder;
 import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-
-
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
 import com.google.common.base.Optional;
-import org.apache.gobblin.compaction.conditions.RecompactionCondition;
+import com.google.common.collect.Lists;
+
 import org.apache.gobblin.compaction.conditions.RecompactionCombineCondition;
+import org.apache.gobblin.compaction.conditions.RecompactionCondition;
 import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnDuration;
 import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnFileCount;
 import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
index f876811..f9c855a 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
@@ -16,19 +16,23 @@
  */
 package org.apache.gobblin.compaction.verify;
 
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.Setter;
+
 import org.apache.gobblin.compaction.audit.AuditCountClient;
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.Map;
 
 /**
  * Class to test audit count verification logic

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
index df27f9f..3491013 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
@@ -23,12 +23,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
@@ -36,6 +30,13 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.gobblin.data.management.retention.DatasetCleaner;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.DatasetsFinder;
@@ -62,9 +63,11 @@ public abstract class ConfigurableGlobDatasetFinder<T extends Dataset> implement
 
   public static final String DATASET_FINDER_PATTERN_KEY = CONFIGURATION_KEY_PREFIX + "dataset.pattern";
   public static final String DATASET_FINDER_BLACKLIST_KEY = CONFIGURATION_KEY_PREFIX + "dataset.blacklist";
+  public static final String DATASET_FINDER_GLOB_BLACKLIST_KEY = CONFIGURATION_KEY_PREFIX + "dataset.glob.blacklist";
 
   protected final Path datasetPattern;
   private final Optional<Pattern> blacklist;
+  private final Optional<Pattern> globPatternBlacklist;
   private final Path commonRoot;
   protected final FileSystem fs;
   protected final Properties props;
@@ -86,6 +89,12 @@ public abstract class ConfigurableGlobDatasetFinder<T extends Dataset> implement
       this.blacklist = Optional.absent();
     }
 
+    if (ConfigUtils.hasNonEmptyPath(config, DATASET_FINDER_GLOB_BLACKLIST_KEY)) {
+      this.globPatternBlacklist = Optional.of(GlobPattern.compile(config.getString(DATASET_FINDER_GLOB_BLACKLIST_KEY)));
+    } else {
+      this.globPatternBlacklist = Optional.absent();
+    }
+
     this.fs = fs;
 
     Path tmpDatasetPattern;
@@ -132,6 +141,9 @@ public abstract class ConfigurableGlobDatasetFinder<T extends Dataset> implement
         if (this.blacklist.isPresent() && this.blacklist.get().matcher(pathToMatch.toString()).find()) {
           continue;
         }
+        if (this.globPatternBlacklist.isPresent() && this.globPatternBlacklist.get().matcher(pathToMatch.toString()).find()) {
+          continue;
+        }
         LOG.info("Found dataset at " + fileStatus.getPath());
         datasets.add(datasetAtPath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath())));
       }


[2/2] incubator-gobblin git commit: [GOBBLIN-448] Add glob pattern blacklist in ConfigurableGlobDatasetFinder

Posted by hu...@apache.org.
[GOBBLIN-448] Add glob pattern blacklist in ConfigurableGlobDatasetFinder

Closes #2322 from yukuai518/blacklist


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5d0e944c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5d0e944c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5d0e944c

Branch: refs/heads/master
Commit: 5d0e944c3facc9814d16fa7588f958f1e763a468
Parents: 54bda27
Author: Kuai Yu <ku...@linkedin.com>
Authored: Tue Mar 27 11:32:30 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Mar 27 11:32:30 2018 -0700

----------------------------------------------------------------------
 .../cluster/ClusterEventMetadataGenerator.java  |  1 -
 .../gobblin/cluster/GobblinClusterManager.java  | 21 +++----
 .../gobblin/cluster/GobblinClusterUtils.java    | 11 ++--
 .../apache/gobblin/cluster/GobblinHelixJob.java |  7 +--
 .../cluster/GobblinHelixJobLauncher.java        |  8 +--
 .../cluster/GobblinHelixJobScheduler.java       | 19 +++----
 .../cluster/GobblinHelixMessagingService.java   | 10 ++--
 .../gobblin/cluster/GobblinHelixTask.java       |  1 -
 .../cluster/GobblinHelixTaskStateTracker.java   |  2 -
 .../gobblin/cluster/GobblinTaskRunner.java      | 14 ++---
 .../cluster/GobblinTaskRunnerMetrics.java       |  9 ---
 .../gobblin/cluster/GobblinTaskStateModel.java  |  1 -
 .../cluster/GobblinTaskStateModelFactory.java   |  1 -
 .../cluster/JobConfigurationManager.java        |  3 +-
 .../ScheduledJobConfigurationManager.java       | 13 +++--
 .../gobblin/cluster/SingleTaskLauncher.java     |  4 +-
 .../StreamingJobConfigurationManager.java       | 13 +++--
 .../gobblin/cluster/GobblinClusterKillTest.java | 15 +++--
 .../cluster/GobblinClusterUtilsTest.java        | 14 +++--
 .../cluster/GobblinHelixJobLauncherTest.java    | 12 ++--
 .../gobblin/cluster/GobblinHelixTaskTest.java   |  8 +--
 .../apache/gobblin/cluster/HelixUtilsTest.java  |  1 -
 .../cluster/JobConfigurationManagerTest.java    |  1 +
 .../gobblin/cluster/SingleTaskLauncherTest.java |  4 +-
 .../org/apache/gobblin/cluster/TestHelper.java  |  2 -
 .../TestShutdownMessageHandlerFactory.java      |  1 -
 .../compaction/ReflectionCompactorFactory.java  |  4 +-
 .../action/CompactionCompleteAction.java        |  5 +-
 .../CompactionCompleteFileOperationAction.java  | 30 +++++-----
 .../CompactionHiveRegistrationAction.java       | 23 ++++----
 .../action/CompactionMarkDirectoryAction.java   | 26 ++++-----
 .../audit/KafkaAuditCountHttpClient.java        | 26 +++++----
 .../audit/PinotAuditCountHttpClient.java        | 26 +++++----
 .../RecompactionCombineCondition.java           |  4 +-
 .../RecompactionConditionBasedOnDuration.java   | 11 ++--
 .../RecompactionConditionBasedOnFileCount.java  |  3 +-
 .../RecompactionConditionBasedOnRatio.java      | 13 ++---
 .../gobblin/compaction/dataset/Dataset.java     |  8 +--
 .../compaction/dataset/DatasetHelper.java       |  4 +-
 .../compaction/dataset/DatasetsFinder.java      |  4 +-
 .../dataset/TimeBasedSubDirDatasetsFinder.java  | 19 ++++---
 .../HiveRegistrationCompactorListener.java      |  2 +-
 .../CompactionLauncherWriter.java               |  7 ++-
 .../CompactionLauncherWriterBuilder.java        |  2 +
 .../HiveMetadataForCompactionExtractor.java     |  8 ++-
 ...veMetadataForCompactionExtractorFactory.java |  8 ++-
 .../hivebasedconstructs/MRCompactionEntity.java |  2 +
 .../ReflectionCompactorListenerFactory.java     |  4 +-
 .../SimpleCompactorCompletionListener.java      |  3 +-
 .../CompactionAvroJobConfigurator.java          | 60 +++++++++++---------
 .../compaction/mapreduce/MRCompactionTask.java  | 16 ++++--
 .../mapreduce/MRCompactionTaskFactory.java      |  1 -
 .../compaction/mapreduce/MRCompactor.java       | 26 ++++-----
 .../mapreduce/MRCompactorJobPropCreator.java    |  5 +-
 .../mapreduce/MRCompactorJobRunner.java         |  3 -
 .../avro/AvroKeyCombineFileRecordReader.java    |  5 +-
 .../AvroKeyRecursiveCombineFileInputFormat.java | 12 ----
 .../FieldAttributeBasedDeltaFieldsProvider.java |  7 +--
 .../compaction/parser/CompactionPathParser.java |  6 +-
 .../compaction/source/CompactionSource.java     | 57 ++++++++++---------
 .../compaction/suite/CompactionAvroSuite.java   | 20 ++++---
 .../compaction/suite/CompactionSuite.java       | 14 ++---
 .../verify/CompactionAuditCountVerifier.java    | 17 +++---
 .../verify/CompactionThresholdVerifier.java     | 15 ++---
 .../verify/CompactionTimeRangeVerifier.java     | 16 +++---
 .../compaction/verify/CompactionVerifier.java   |  4 +-
 .../verify/InputRecordCountHelper.java          | 29 +++++-----
 .../mapreduce/MRCompactionTaskTest.java         | 36 ++++++------
 .../mapreduce/RenameSourceDirectoryTest.java    | 18 +++---
 .../avro/ConfBasedDeltaFieldProviderTest.java   |  1 -
 .../conditions/RecompactionConditionTest.java   | 11 ++--
 .../verify/PinotAuditCountVerifierTest.java     | 16 ++++--
 .../profile/ConfigurableGlobDatasetFinder.java  | 24 ++++++--
 73 files changed, 435 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
index 6aeb89c..842e285 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.EventName;
 import org.apache.gobblin.runtime.EventMetadataUtils;
 import org.apache.gobblin.runtime.JobContext;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index d57c61e..1592cf8 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.cluster;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -36,13 +35,6 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
-import org.apache.gobblin.metrics.ContextAwareMetric;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -67,8 +59,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -80,9 +72,17 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.app.ApplicationException;
@@ -93,9 +93,6 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
-import javax.annotation.Nonnull;
-import lombok.Getter;
-
 
 /**
  * The central cluster manager for Gobblin Clusters.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 6b6ead8..41f926e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -17,19 +17,20 @@
 
 package org.apache.gobblin.cluster;
 
-import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
-
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
-import lombok.extern.slf4j.Slf4j;
+import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
 
 @Alpha
 @Slf4j

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
index bc9f88f..db34e3e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
@@ -20,20 +20,19 @@ package org.apache.gobblin.cluster;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
-import lombok.extern.slf4j.Slf4j;
-
-
 import org.quartz.InterruptableJob;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.scheduler.BaseGobblinJob;
 import org.apache.gobblin.scheduler.JobScheduler;
-import org.quartz.UnableToInterruptJobException;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index d502462..5035216 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -25,8 +25,6 @@ import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -47,6 +45,8 @@ import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
+import javax.annotation.Nullable;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.StateStore;
@@ -55,11 +55,13 @@ import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.rest.LauncherTypeEnum;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.ExecutionModel;
+import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.Task;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
@@ -69,8 +71,6 @@ import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.SerializationUtils;
 
-import javax.annotation.Nullable;
-
 
 /**
  * An implementation of {@link JobLauncher} that launches a Gobblin job using the Helix task framework.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index ef12162..e539273 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -30,8 +30,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
@@ -42,31 +40,32 @@ import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
 import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
 import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.api.JobExecutionLauncher;
-import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobLauncher;
-import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.listeners.AbstractJobListener;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
-
-
-import javax.annotation.Nonnull;
-import lombok.Getter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
index cdced1b..4101f17 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
@@ -16,29 +16,29 @@
  */
 package org.apache.gobblin.cluster;
 
-import com.google.common.base.Strings;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import java.util.UUID;
 import java.util.regex.Pattern;
+
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.messaging.CriteriaEvaluator;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.messaging.ZNRecordRow;
-import org.apache.helix.PropertyKey;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index c6b9514..e651b8e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -35,7 +35,6 @@ import com.google.common.io.Closer;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
index 79df1fa..7120bef 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
@@ -22,8 +22,6 @@ import java.util.Properties;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 
-import org.apache.helix.HelixManager;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 3ec40dc..e68774d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -36,11 +36,6 @@ import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -74,9 +69,15 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskStateTracker;
 import org.apache.gobblin.runtime.services.JMXReportingService;
@@ -86,9 +87,6 @@ import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.PathUtils;
 
-import javax.annotation.Nonnull;
-import lombok.Getter;
-
 import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
index 51e8b36..6435ff4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
@@ -17,17 +17,8 @@
 
 package org.apache.gobblin.cluster;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import com.codahale.metrics.Metric;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.TaskExecutor;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java
index 5a96e48..cc4a215 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java
@@ -25,7 +25,6 @@ import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskStateModel;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.TaskExecutor;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java
index 335a1e0..3603571 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java
@@ -26,7 +26,6 @@ import org.apache.helix.task.TaskStateModel;
 import org.apache.helix.task.TaskStateModelFactory;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.TaskExecutor;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
index 42fab27..d60540c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.util.List;
 import java.util.Properties;
 
-import javax.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +29,8 @@ import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 
+import javax.annotation.Nullable;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
 import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
index 0f2d356..00c7b0d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
@@ -27,23 +27,24 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-import org.apache.gobblin.runtime.api.SpecExecutor;
 
 
 @Alpha

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
index 8cbbc00..3a0c780 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java
@@ -27,11 +27,11 @@ import org.apache.commons.lang3.text.StrTokenizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.util.GobblinProcessBuilder;
 import org.apache.gobblin.util.SystemPropertiesWrapper;
 
-import com.typesafe.config.Config;
-
 import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.CLUSTER_CONFIG_FILE_PATH;
 import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.JOB_ID;
 import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.WORK_UNIT_FILE_PATH;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
index 3c01704..b6985a1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
@@ -25,27 +25,28 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-
-import lombok.Getter;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
index ad19135..11b8808 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.gobblin.cluster;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,9 +26,9 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Iterator;
 import java.util.concurrent.TimeoutException;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
-import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
@@ -43,6 +38,14 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
+
 
 /**
  * Unit tests for killing {@link GobblinClusterManager}s and {@link GobblinTaskRunner}s

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
index 4d83658..9288d42 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
@@ -17,18 +17,20 @@
 
 package org.apache.gobblin.cluster;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.annotations.Test;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
 public class GobblinClusterUtilsTest {
 
   FileSystem fs = mock(FileSystem.class);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 10ef3db..ef5fc4f 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -29,10 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.avro.Schema;
 import org.apache.curator.test.TestingServer;
-import org.apache.gobblin.metastore.DatasetStateStore;
-import org.apache.gobblin.runtime.JobContext;
-import org.apache.gobblin.runtime.listeners.AbstractJobListener;
-import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,16 +52,20 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import lombok.Getter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.FsDatasetStateStore;
+import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.listeners.AbstractJobListener;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 
-import lombok.Getter;
-
 
 /**
  * Unit tests for {@link GobblinHelixJobLauncher}.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index 1cbd72e..a104b7d 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -17,26 +17,20 @@
 
 package org.apache.gobblin.cluster;
 
-import com.typesafe.config.ConfigFactory;
-import org.apache.gobblin.metastore.FsStateStore;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.avro.Schema;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskResult;
-
 import org.mockito.Mockito;
-
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -44,10 +38,12 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.example.simplejson.SimpleJsonConverter;
 import org.apache.gobblin.example.simplejson.SimpleJsonSource;
+import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.TaskExecutor;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
index 2afd01f..d7841a8 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
-
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
index 73098f2..9e36d8c 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
@@ -39,6 +39,7 @@ import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
+
 import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
index afa933d..3c56428 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java
@@ -25,11 +25,11 @@ import java.util.List;
 
 import org.testng.annotations.Test;
 
+import com.typesafe.config.ConfigFactory;
+
 import org.apache.gobblin.util.GobblinProcessBuilder;
 import org.apache.gobblin.util.SystemPropertiesWrapper;
 
-import com.typesafe.config.ConfigFactory;
-
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java
index b717585..e8b32ab 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java
@@ -26,12 +26,10 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.TestingServer;
-
 import org.testng.Assert;
 
 import com.google.common.io.Closer;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
index c0b80a9..cd805bf 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java
@@ -25,7 +25,6 @@ import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
-
 import org.testng.Assert;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
index fc68e47..1ba6fac 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
@@ -20,11 +20,11 @@ package org.apache.gobblin.compaction;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-
 import org.apache.gobblin.compaction.listeners.CompactorListener;
 import org.apache.gobblin.metrics.Tag;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java
index a5b21bc..8c5979c 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java
@@ -16,11 +16,12 @@
  */
 
 package org.apache.gobblin.compaction.action;
-import org.apache.gobblin.dataset.Dataset;
-import org.apache.gobblin.metrics.event.EventSubmitter;
 
 import java.io.IOException;
 
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
 
 /**
  * An interface which represents an action that is invoked after a compaction job is finished.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index a21ca93..831443b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -17,9 +17,21 @@
 
 package org.apache.gobblin.compaction.action;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -33,20 +45,6 @@ import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.WriterUtils;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index 7792a50..a7536d3 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -17,6 +17,18 @@
 
 package org.apache.gobblin.compaction.action;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -29,17 +41,6 @@ import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
 import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-
 
 /**
  * Class responsible for hive registration after compaction is complete

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
index b504996..89be94a 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
@@ -17,6 +17,19 @@
 
 package org.apache.gobblin.compaction.action;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -25,19 +38,6 @@ import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-
 
 @Slf4j
 @AllArgsConstructor

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
index 7f6fb68..04988fe 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
@@ -17,15 +17,11 @@
 
 package org.apache.gobblin.compaction.audit;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.gobblin.configuration.State;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
-import javax.annotation.concurrent.ThreadSafe;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
@@ -35,10 +31,16 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.util.EntityUtils;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
 
 /**
  * A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java
index 0e06606..5889edd 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java
@@ -17,15 +17,11 @@
 
 package org.apache.gobblin.compaction.audit;
 
-import com.google.api.client.util.Charsets;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.gobblin.configuration.State;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.Map;
 
-import javax.annotation.concurrent.ThreadSafe;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
@@ -35,10 +31,16 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.util.EntityUtils;
 
-import java.io.IOException;
-import java.net.URLEncoder;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.api.client.util.Charsets;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
 
 /**
  * A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java
index 811be5b..a7731f6 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java
@@ -17,17 +17,17 @@
 
 package org.apache.gobblin.compaction.conditions;
 
-
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import com.google.common.collect.ImmutableList;
 
 import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java
index c432ed2..3704fab 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java
@@ -17,12 +17,6 @@
 
 package org.apache.gobblin.compaction.conditions;
 
-
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
-import org.apache.gobblin.compaction.dataset.Dataset;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 import org.joda.time.format.PeriodFormatter;
@@ -32,6 +26,11 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java
index b8817fe..9e8e25b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java
@@ -16,13 +16,14 @@
  */
 
 package org.apache.gobblin.compaction.conditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.compaction.dataset.Dataset;
 import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.compaction.dataset.Dataset;
 
 /**
  * An implementation {@link RecompactionCondition} which examines the number of files in the late outputDir

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java
index d58a17d..e2bcdb4 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java
@@ -17,16 +17,9 @@
 
 package org.apache.gobblin.compaction.conditions;
 
-
 import java.util.List;
 import java.util.Map;
 
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
-import org.apache.gobblin.compaction.dataset.Dataset;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.util.DatasetFilterUtils;
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +27,12 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Maps;
 
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.util.DatasetFilterUtils;
+
 
 /**
  * An implementation {@link RecompactionCondition} which examines the late record percentage.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java
index 73f12a9..36ec939 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java
@@ -22,10 +22,6 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 
@@ -33,6 +29,10 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java
index 2eede65..ca05822 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java
@@ -26,14 +26,14 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.joda.time.DateTimeZone;
 import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import com.google.common.base.Optional;
 
 import org.apache.gobblin.compaction.conditions.RecompactionCondition;
 import org.apache.gobblin.compaction.conditions.RecompactionConditionFactory;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java
index cd56460..6ed04af 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java
@@ -17,17 +17,14 @@
 
 package org.apache.gobblin.compaction.dataset;
 
-import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-import java.util.stream.Collectors;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +36,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils;
 import org.apache.gobblin.util.DatasetFilterUtils;
 import org.apache.gobblin.util.HadoopUtils;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
index b00d7f4..111fe75 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
@@ -17,12 +17,9 @@
 
 package org.apache.gobblin.compaction.dataset;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.DatasetFilterUtils;
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.util.Set;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,8 +32,14 @@ import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.PeriodFormatter;
 import org.joda.time.format.PeriodFormatterBuilder;
 
-import java.io.IOException;
-import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.DatasetFilterUtils;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java
index 3eec1d5..d6b5d81 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java
@@ -19,8 +19,8 @@ package org.apache.gobblin.compaction.hive.registration;
 
 import java.util.Properties;
 
-import org.apache.gobblin.compaction.listeners.CompactorListener;
 import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.listeners.CompactorListener;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.HiveRegister;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java
index c6a66c6..43c5fbc 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java
@@ -21,15 +21,18 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+
 import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Joiner;
+
 import org.apache.gobblin.compaction.listeners.CompactorListener;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.compaction.mapreduce.avro.ConfBasedDeltaFieldProvider;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.writer.DataWriter;
-import org.apache.gobblin.compaction.mapreduce.avro.ConfBasedDeltaFieldProvider;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java
index d108343..156cc54 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java
@@ -18,7 +18,9 @@
 package org.apache.gobblin.compaction.hivebasedconstructs;
 
 import java.io.IOException;
+
 import org.apache.avro.Schema;
+
 import org.apache.gobblin.writer.DataWriter;
 import org.apache.gobblin.writer.DataWriterBuilder;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
index c5b817e..020da2d 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
@@ -19,20 +19,24 @@ package org.apache.gobblin.compaction.hivebasedconstructs;
 
 import java.io.IOException;
 import java.util.List;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.thrift.TException;
+
 import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor;
 import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.util.AutoReturnableObject;
-import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor;
-import lombok.extern.slf4j.Slf4j;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
index 27958b5..ab89df5 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
@@ -17,14 +17,16 @@
 
 package org.apache.gobblin.compaction.hivebasedconstructs;
 
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor;
-import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractorFactory;
 import java.io.IOException;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.thrift.TException;
 
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor;
+import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractorFactory;
+
 
 /**
  * Factory for {@link HiveMetadataForCompactionExtractor}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java
index 2d721d0..1f17955 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.compaction.hivebasedconstructs;
 
 import java.util.List;
 import java.util.Properties;
+
 import org.apache.hadoop.fs.Path;
+
 import lombok.Getter;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java
index e9a1fca..b780c2c 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java
@@ -21,12 +21,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-
 import org.apache.gobblin.configuration.State;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java
index b4365bc..e82bb33 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java
@@ -18,12 +18,13 @@
 package org.apache.gobblin.compaction.listeners;
 
 import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.State;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index b8f407f..c9b7708 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -17,25 +17,16 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
-import org.apache.gobblin.compaction.mapreduce.avro.*;
-import org.apache.gobblin.compaction.parser.CompactionPathParser;
-import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.Dataset;
-import org.apache.gobblin.dataset.FileSystemDataset;
-import org.apache.gobblin.util.AvroUtils;
-import org.apache.gobblin.util.FileListUtils;
-import org.apache.gobblin.util.HadoopUtils;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
@@ -52,15 +43,30 @@ import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+import com.google.common.base.Enums;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyCompactorOutputFormat;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
+import org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.HadoopUtils;
 
 /**
  * A configurator that focused on creating avro compaction map-reduce job

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index be71804..78ed1c2 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -17,6 +17,16 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.ImmutableMap;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.suite.CompactionSuite;
@@ -27,13 +37,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.mapreduce.MRTask;
 
-import java.util.List;
-import java.io.IOException;
-import java.util.Map;
 
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.mapreduce.Job;
-import com.google.common.collect.ImmutableMap;
 
 /**
  * Customized task of type {@link MRTask}, which runs MR job to compact dataset.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java
index ff8b012..9d5b749 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java
@@ -18,7 +18,6 @@ package org.apache.gobblin.compaction.mapreduce;
 
 import java.io.IOException;
 
-
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.mapreduce.MRTaskFactory;
 import org.apache.gobblin.runtime.task.TaskIFace;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index 9e53ef4..953c0dc 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -17,13 +17,6 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
-import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.COMPACTION_COMPLETE;
-import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.GIVEN_UP;
-import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.UNVERIFIED;
-import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.VERIFIED;
-import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.ABORTED;
-import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.COMMITTED;
-
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -36,9 +29,7 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-import java.lang.reflect.InvocationTargetException;
 
-import org.joda.time.DateTime;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -66,13 +57,13 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.gobblin.compaction.Compactor;
-import org.apache.gobblin.compaction.listeners.CompactorCompletionListener;
-import org.apache.gobblin.compaction.listeners.CompactorCompletionListenerFactory;
-import org.apache.gobblin.compaction.listeners.CompactorListener;
 import org.apache.gobblin.compaction.dataset.Dataset;
 import org.apache.gobblin.compaction.dataset.DatasetsFinder;
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
+import org.apache.gobblin.compaction.listeners.CompactorCompletionListener;
+import org.apache.gobblin.compaction.listeners.CompactorCompletionListenerFactory;
+import org.apache.gobblin.compaction.listeners.CompactorListener;
 import org.apache.gobblin.compaction.verify.DataCompletenessVerifier;
 import org.apache.gobblin.compaction.verify.DataCompletenessVerifier.Results;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -81,15 +72,22 @@ import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ClusterNameTags;
 import org.apache.gobblin.util.DatasetFilterUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.ClusterNameTags;
 import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.recordcount.CompactionRecordCountProvider;
 import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
+import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.COMPACTION_COMPLETE;
+import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.GIVEN_UP;
+import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.UNVERIFIED;
+import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.VERIFIED;
+import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.ABORTED;
+import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.COMMITTED;
+
 /**
  * MapReduce-based {@link org.apache.gobblin.compaction.Compactor}. Compaction will run on each qualified {@link Dataset}
  * under {@link #COMPACTION_INPUT_DIR}.