You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/22 23:48:06 UTC

incubator-gobblin git commit: [GOBBLIN-423] Add record count limit to salesforce source

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d323f6022 -> 5b8f7dfb1


[GOBBLIN-423] Add record count limit to salesforce source

Closes #2300 from yukuai518/limit


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5b8f7dfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5b8f7dfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5b8f7dfb

Branch: refs/heads/master
Commit: 5b8f7dfb10fa8eddaedfbb13df6adcaf018acf65
Parents: d323f60
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Mar 22 16:47:59 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Mar 22 16:47:59 2018 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   7 +-
 .../java/org/apache/gobblin/source/Source.java  |   9 ++
 .../apache/gobblin/cluster/GobblinHelixJob.java |  16 +--
 .../cluster/GobblinHelixJobScheduler.java       | 119 +++++++++++++++----
 .../extractor/extract/AbstractSource.java       |   5 +
 .../source/extractor/extract/ExtractType.java   |   8 +-
 .../extractor/extract/QueryBasedSource.java     |   2 +-
 .../source/extractor/partition/Partitioner.java |  28 +++--
 .../source/extractor/watermark/Watermark.java   |  12 +-
 .../gobblin/runtime/AbstractJobLauncher.java    |   4 +
 .../org/apache/gobblin/runtime/JobLauncher.java |   4 +
 .../apache/gobblin/runtime/SourceDecorator.java |   5 +
 .../apache/gobblin/scheduler/JobScheduler.java  |  39 ++++--
 .../gobblin/salesforce/SalesforceSource.java    |  72 +++++++++--
 14 files changed, 260 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 612fd8b..70459a2 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -88,6 +88,10 @@ public class ConfigurationKeys {
   /**
    * Job scheduler configuration properties.
    */
+  // Job retriggering
+  public static final String JOB_RETRIGGERING_ENABLED = "job.retriggering.enabled";
+  public static final String DEFAULT_JOB_RETRIGGERING_ENABLED = "true";
+
   // Job executor thread pool size
   public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size";
   public static final int DEFAULT_JOB_EXECUTOR_THREAD_POOL_SIZE = 5;
@@ -473,6 +477,8 @@ public class ConfigurationKeys {
   public static final String SOURCE_MAX_NUMBER_OF_PARTITIONS = "source.max.number.of.partitions";
   public static final String SOURCE_SKIP_FIRST_RECORD = "source.skip.first.record";
   public static final String SOURCE_COLUMN_NAME_CASE = "source.column.name.case";
+  public static final String SOURCE_EARLY_STOP_ENABLED = "source.earlyStop.enabled";
+  public static final boolean DEFAULT_SOURCE_EARLY_STOP_ENABLED = false;
 
   /**
    * Configuration properties used by the QueryBasedExtractor.
@@ -583,7 +589,6 @@ public class ConfigurationKeys {
   public static final String DEFAULT_SOURCE_QUERYBASED_IS_METADATA_COLUMN_CHECK_ENABLED = "true";
   public static final String DEFAULT_COLUMN_NAME_CASE = "NOCHANGE";
   public static final int DEFAULT_SOURCE_QUERYBASED_JDBC_RESULTSET_FETCH_SIZE = 1000;
-
   public static final String FILEBASED_REPORT_STATUS_ON_COUNT = "filebased.report.status.on.count";
   public static final int DEFAULT_FILEBASED_REPORT_STATUS_ON_COUNT = 10000;
   public static final String DEFAULT_SOURCE_TIMEZONE = PST_TIMEZONE_NAME;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java b/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java
index 1edfd8d..37e73c8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java
@@ -94,4 +94,13 @@ public interface Source<S, D> {
    * @param state see {@link SourceState}
    */
   public abstract void shutdown(SourceState state);
+
+  /**
+   * Instead of handling all {@link WorkUnit}s in one run, some {@link Source} may choose to stop early in order to handle the
+   * proper workload, which can cause multiple runs after the initial run.
+   * @return If the same job has early stopped
+   */
+  public default boolean isEarlyStopped() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
index b8fd2f4..bc9f88f 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
@@ -17,15 +17,11 @@
 
 package org.apache.gobblin.cluster;
 
-import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.helix.HelixManager;
 
 import org.quartz.InterruptableJob;
 import org.quartz.Job;
@@ -34,8 +30,6 @@ import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.scheduler.BaseGobblinJob;
 import org.apache.gobblin.scheduler.JobScheduler;
@@ -56,24 +50,18 @@ public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob
   @Override
   public void executeImpl(JobExecutionContext context) throws JobExecutionException {
     JobDataMap dataMap = context.getJobDetail().getJobDataMap();
-    ConcurrentHashMap runningMap = (ConcurrentHashMap)dataMap.get(GobblinHelixJobScheduler.JOB_RUNNING_MAP);
     final JobScheduler jobScheduler = (JobScheduler) dataMap.get(JobScheduler.JOB_SCHEDULER_KEY);
     // the properties may get mutated during job execution and the scheduler reuses it for the next round of scheduling,
     // so clone it
     final Properties jobProps = (Properties)((Properties) dataMap.get(JobScheduler.PROPERTIES_KEY)).clone();
     final JobListener jobListener = (JobListener) dataMap.get(JobScheduler.JOB_LISTENER_KEY);
-    HelixManager helixManager = (HelixManager) dataMap.get(GobblinHelixJobScheduler.HELIX_MANAGER_KEY);
-    Path appWorkDir = (Path) dataMap.get(GobblinHelixJobScheduler.APPLICATION_WORK_DIR_KEY);
-    @SuppressWarnings("unchecked")
-    List<? extends Tag<?>> eventMetadata = (List<? extends Tag<?>>) dataMap.get(GobblinHelixJobScheduler.METADATA_TAGS);
 
     try {
-      final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager, appWorkDir, eventMetadata, runningMap);
       if (Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD,
               Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) {
-        jobScheduler.runJob(jobProps, jobListener, jobLauncher);
+        jobScheduler.runJob(jobProps, jobListener);
       } else {
-        cancellable = jobScheduler.scheduleJobImmediately(jobProps, jobListener, jobLauncher);
+        cancellable = jobScheduler.scheduleJobImmediately(jobProps, jobListener);
       }
     } catch (Throwable t) {
       throw new JobExecutionException(t);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 48b12f2..ef12162 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -19,23 +19,25 @@ package org.apache.gobblin.cluster;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
@@ -47,8 +49,6 @@ import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
 import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareMetric;
-import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
@@ -66,6 +66,7 @@ import org.apache.gobblin.scheduler.SchedulerService;
 
 
 import javax.annotation.Nonnull;
+import lombok.Getter;
 
 
 /**
@@ -79,11 +80,6 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobScheduler.class);
 
-  static final String HELIX_MANAGER_KEY = "helixManager";
-  static final String APPLICATION_WORK_DIR_KEY = "applicationWorkDir";
-  static final String METADATA_TAGS = "metadataTags";
-  static final String JOB_RUNNING_MAP = "jobRunningMap";
-
   private final Properties properties;
   private final HelixManager helixManager;
   private final EventBus eventBus;
@@ -262,13 +258,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   @Override
   public void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
-    Map<String, Object> additionalJobDataMap = Maps.newHashMap();
-    additionalJobDataMap.put(HELIX_MANAGER_KEY, this.helixManager);
-    additionalJobDataMap.put(APPLICATION_WORK_DIR_KEY, this.appWorkDir);
-    additionalJobDataMap.put(METADATA_TAGS, this.metadataTags);
-    additionalJobDataMap.put(JOB_RUNNING_MAP, this.jobRunningMap);
     try {
-      scheduleJob(jobProps, jobListener, additionalJobDataMap, GobblinHelixJob.class);
+      scheduleJob(jobProps, jobListener, Maps.newHashMap(), GobblinHelixJob.class);
     } catch (Exception e) {
       throw new JobException("Failed to schedule job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
     }
@@ -280,19 +271,101 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
-    try {
-      JobLauncher jobLauncher = buildGobblinHelixJobLauncher(jobProps);
-      runJob(jobProps, jobListener, jobLauncher);
-    } catch (Exception e) {
-      throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
-    }
+    new RetriggeringJobCallable(jobProps, jobListener).call();
   }
 
-  private GobblinHelixJobLauncher buildGobblinHelixJobLauncher(Properties jobProps)
+  @Override
+  public GobblinHelixJobLauncher buildJobLauncher(Properties jobProps)
       throws Exception {
     return new GobblinHelixJobLauncher(jobProps, this.helixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap);
   }
 
+  private class RetriggeringJobCallable implements Callable {
+    Properties jobProps;
+    JobListener jobListener;
+
+    public RetriggeringJobCallable(Properties jobProps, JobListener jobListener) {
+      this.jobProps = jobProps;
+      this.jobListener = jobListener;
+    }
+
+    private boolean isRetriggeringEnabled() {
+      return PropertiesUtils.getPropAsBoolean(jobProps, ConfigurationKeys.JOB_RETRIGGERING_ENABLED, ConfigurationKeys.DEFAULT_JOB_RETRIGGERING_ENABLED);
+    }
+
+    @Getter
+    JobLauncher currentJobLauncher = null;
+
+    @Override
+    public Void call() throws JobException {
+      try {
+        while (true) {
+          currentJobLauncher = buildJobLauncher(jobProps);
+          boolean isEarlyStopped = runJob(jobProps, jobListener, currentJobLauncher);
+          boolean isRetriggerEnabled = this.isRetriggeringEnabled();
+          if (isEarlyStopped && isRetriggerEnabled) {
+            LOGGER.info("Job {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+          } else {
+            break;
+          }
+          currentJobLauncher = null;
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+        throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+      }
+
+      return null;
+    }
+  }
+
+  public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) {
+    RetriggeringJobCallable retriggeringJob = new RetriggeringJobCallable(jobProps, jobListener);
+    final Future<?> future = this.jobExecutor.submit(retriggeringJob);
+    return new Future() {
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        if (!GobblinHelixJobScheduler.this.isCancelRequested()) {
+          return false;
+        }
+        boolean result = true;
+        try {
+          JobLauncher jobLauncher = retriggeringJob.getCurrentJobLauncher();
+          if (jobLauncher != null) {
+            jobLauncher.cancelJob(jobListener);
+          }
+        } catch (JobException e) {
+          LOGGER.error("Failed to cancel job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+          result = false;
+        }
+        if (mayInterruptIfRunning) {
+          result &= future.cancel(true);
+        }
+        return result;
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return future.isCancelled();
+      }
+
+      @Override
+      public boolean isDone() {
+        return future.isDone();
+      }
+
+      @Override
+      public Object get() throws InterruptedException, ExecutionException {
+        return future.get();
+      }
+
+      @Override
+      public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        return future.get(timeout, unit);
+      }
+    };
+  }
+
   @Subscribe
   public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
     LOGGER.info("Received new job configuration of job " + newJobArrival.getJobName());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java
index db51597..d7fd05e 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java
@@ -139,4 +139,9 @@ public abstract class AbstractSource<S, D> implements Source<S, D> {
   public Extract createExtract(TableType type, String namespace, String table) {
     return this.extractFactory.getUniqueExtract(type, namespace, table);
   }
+
+  @Override
+  public boolean isEarlyStopped() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java
index a8ae7bf..49cfae9 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java
@@ -17,6 +17,12 @@
 
 package org.apache.gobblin.source.extractor.extract;
 
+/**
+ * Different extract types
+ */
 public enum ExtractType {
-  SNAPSHOT, APPEND_DAILY, APPEND_HOURLY, APPEND_BATCH
+  SNAPSHOT,           // Used iff user wants highwatermark to be set to latest.
+  APPEND_DAILY,       // Used iff user wants highwatermark to be set to a fixed point, like CURRENTDATE - <backoff days>.
+  APPEND_HOURLY,      // Used iff user wants highwatermark to be set to a fixed point, like CURRENTHOUR - <backoff hours>.
+  APPEND_BATCH        // <Please document this>
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
index 29c98d9..5d5330d 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
@@ -198,7 +198,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
         combinedState.setProp(ConfigurationKeys.SOURCE_QUERYBASED_END_VALUE, previousWatermark);
       }
 
-      workUnits.addAll(generateWorkUnits(sourceEntity, state, previousWatermark));
+      workUnits.addAll(generateWorkUnits(sourceEntity, combinedState, previousWatermark));
     }
 
     log.info("Total number of workunits for the current run: " + workUnits.size());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
index 90da5b5..052d6b4 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
@@ -53,6 +53,7 @@ public class Partitioner {
   public static final String WATERMARKTIMEFORMAT = "yyyyMMddHHmmss";
   public static final String HAS_USER_SPECIFIED_PARTITIONS = "partitioner.hasUserSpecifiedPartitions";
   public static final String USER_SPECIFIED_PARTITIONS = "partitioner.userSpecifiedPartitions";
+  public static final String IS_EARLY_STOPPED = "partitioner.isEarlyStopped";
 
   public static final Comparator<Partition> ascendingComparator = new Comparator<Partition>() {
     @Override
@@ -109,7 +110,9 @@ public class Partitioner {
    * Get partitions with low and high water marks
    *
    * @param previousWatermark previous water mark from metadata
-   * @return map of partition intervals
+   * @return map of partition intervals.
+   *         map's key is interval begin time (in format {@link Partitioner#WATERMARKTIMEFORMAT})
+   *         map's value is interval end time (in format {@link Partitioner#WATERMARKTIMEFORMAT})
    */
   @Deprecated
   public HashMap<Long, Long> getPartitions(long previousWatermark) {
@@ -196,9 +199,12 @@ public class Partitioner {
    * Generate the partitions based on the lists specified by the user in job config
    */
   private List<Partition> createUserSpecifiedPartitions() {
+
     List<Partition> partitions = new ArrayList<>();
 
     List<String> watermarkPoints = state.getPropAsList(USER_SPECIFIED_PARTITIONS);
+    boolean isEarlyStopped = state.getPropAsBoolean(IS_EARLY_STOPPED);
+
     if (watermarkPoints == null || watermarkPoints.size() == 0 ) {
       LOG.info("There should be some partition points");
       long defaultWatermark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
@@ -234,7 +240,9 @@ public class Partitioner {
     highWatermark = adjustWatermark(watermarkPoints.get(i), watermarkType);
     ExtractType extractType =
         ExtractType.valueOf(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_EXTRACT_TYPE).toUpperCase());
-    if (isFullDump() || isSnapshot(extractType)) {
+
+    // If it is early stop, we should not remove upper bounds
+    if ((isFullDump() || isSnapshot(extractType)) && !isEarlyStopped) {
       // The upper bounds can be removed for last work unit
       partitions.add(new Partition(lowWatermark, highWatermark, true, false));
     } else {
@@ -242,6 +250,7 @@ public class Partitioner {
       partitions.add(new Partition(lowWatermark, highWatermark, true, true));
     }
 
+
     return partitions;
   }
 
@@ -291,18 +300,21 @@ public class Partitioner {
   }
 
   /**
-   * Get low water mark
+   * Get low water mark:
+   *  (1) Use {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff it is a full dump (or watermark override is enabled)
+   *  (2) Otherwise use previous watermark (fallback to {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff previous watermark is unavailable)
    *
    * @param extractType Extract type
    * @param watermarkType Watermark type
    * @param previousWatermark Previous water mark
    * @param deltaForNextWatermark delta number for next water mark
-   * @return low water mark
+   * @return low water mark in {@link Partitioner#WATERMARKTIMEFORMAT}
    */
   @VisibleForTesting
   protected long getLowWatermark(ExtractType extractType, WatermarkType watermarkType, long previousWatermark,
       int deltaForNextWatermark) {
     long lowWatermark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
+
     if (this.isFullDump() || this.isWatermarkOverride()) {
       String timeZone =
           this.state.getProp(ConfigurationKeys.SOURCE_TIMEZONE, ConfigurationKeys.DEFAULT_SOURCE_TIMEZONE);
@@ -331,7 +343,7 @@ public class Partitioner {
    * @param watermarkType Watermark type
    * @param previousWatermark Previous water mark
    * @param deltaForNextWatermark delta number for next water mark
-   * @return snapshot low water mark
+   * @return Previous watermark (fallback to {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff previous watermark is unavailable)
    */
   private long getSnapshotLowWatermark(WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) {
     LOG.debug("Getting snapshot low water mark");
@@ -362,7 +374,7 @@ public class Partitioner {
    * @param watermarkType Watermark type
    * @param previousWatermark Previous water mark
    * @param deltaForNextWatermark delta number for next water mark
-   * @return append low water mark
+   * @return Previous watermark (fallback to {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff previous watermark is unavailable)
    */
   private long getAppendLowWatermark(WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) {
     LOG.debug("Getting append low water mark");
@@ -384,7 +396,7 @@ public class Partitioner {
    *
    * @param extractType Extract type
    * @param watermarkType Watermark type
-   * @return high water mark
+   * @return high water mark in {@link Partitioner#WATERMARKTIMEFORMAT}
    */
   @VisibleForTesting
   protected long getHighWatermark(ExtractType extractType, WatermarkType watermarkType) {
@@ -598,6 +610,8 @@ public class Partitioner {
   }
 
   /**
+   * If full dump is true, the low watermark will be based on {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE}
+   * Otherwise it will base on the previous watermark. Please refer to {@link Partitioner#getLowWatermark(ExtractType, WatermarkType, long, int)}
    * @return full dump or not
    */
   public boolean isFullDump() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java
index e83fc65..53827b4 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java
@@ -26,8 +26,8 @@ public interface Watermark {
    * Condition statement with the water mark value using the operator. Example (last_updated_ts >= 2013-01-01 00:00:00
    *
    * @param extractor
-   * @param water mark value
-   * @param relational operator between water mark column and value
+   * @param watermarkValue mark value
+   * @param operator operator between water mark column and value
    * @return condition statement
    */
   public String getWatermarkCondition(QueryBasedExtractor<?, ?> extractor, long watermarkValue, String operator);
@@ -35,10 +35,10 @@ public interface Watermark {
   /**
    * Get partitions for the given range
    *
-   * @param low water mark value
-   * @param high water mark value
-   * @param partition interval(in hours or days)
-   * @param maximum number of partitions
+   * @param lowWatermarkValue water mark value
+   * @param highWatermarkValue water mark value
+   * @param partitionInterval interval(in hours or days)
+   * @param maxIntervals number of partitions
    * @return partitions
    */
   public HashMap<Long, Long> getIntervals(long lowWatermarkValue, long highWatermarkValue, long partitionInterval,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index da770af..0c50c32 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -887,6 +887,10 @@ public abstract class AbstractJobLauncher implements JobLauncher {
     }
   }
 
+  public boolean isEarlyStopped() {
+    return this.jobContext.getSource().isEarlyStopped();
+  }
+
   /**
    * Staging data cannot be cleaned if exactly once semantics is used, and the job has unfinished
    * commit sequences.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java
index 75c558f..52284b7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java
@@ -67,4 +67,8 @@ public interface JobLauncher extends Closeable {
    */
   public void cancelJob(@Nullable JobListener jobListener)
       throws JobException;
+
+  public default boolean isEarlyStopped() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
index 9471444..4558325 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
@@ -116,4 +116,9 @@ public class SourceDecorator<S, D> implements WorkUnitStreamSource<S, D>, Decora
   public Object getDecoratedObject() {
     return this.source;
   }
+
+  @Override
+  public boolean isEarlyStopped() {
+    return this.source.isEarlyStopped();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index 7ef6d85..f2e254d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -24,14 +24,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.hadoop.fs.Path;
@@ -63,6 +62,7 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.AbstractIdleService;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -135,6 +135,7 @@ public class JobScheduler extends AbstractIdleService {
 
   private final Closer closer = Closer.create();
 
+  @Getter
   private volatile boolean cancelRequested = false;
 
   public JobScheduler(Properties properties, SchedulerService scheduler)
@@ -254,17 +255,20 @@ public class JobScheduler extends AbstractIdleService {
    *                      with scheduling the job
    */
   public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher) {
-    Runnable runnable = new Runnable() {
+    Callable<Void> callable = new Callable<Void>() {
       @Override
-      public void run() {
+      public Void call() throws JobException {
         try {
           runJob(jobProps, jobListener, jobLauncher);
         } catch (JobException je) {
           LOG.error("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+          throw je;
         }
+        return null;
       }
     };
-    final Future<?> future = this.jobExecutor.submit(runnable);
+
+    final Future<?> future = this.jobExecutor.submit(callable);
     return new Future() {
       @Override
       public boolean cancel(boolean mayInterruptIfRunning) {
@@ -306,6 +310,14 @@ public class JobScheduler extends AbstractIdleService {
     };
   }
 
+  public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) throws JobException {
+    try {
+      return scheduleJobImmediately(jobProps, jobListener, buildJobLauncher(jobProps));
+    } catch (Exception e) {
+      throw new JobException("Job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + " cannot be immediately scheduled.", e);
+    }
+  }
+
   /**
    * Submit a runnable to the {@link ExecutorService} of this {@link JobScheduler}.
    * @param runnable the runnable to submit to the job executor
@@ -419,12 +431,16 @@ public class JobScheduler extends AbstractIdleService {
   public void runJob(Properties jobProps, JobListener jobListener)
       throws JobException {
     try {
-      runJob(jobProps, jobListener, JobLauncherFactory.newJobLauncher(this.properties, jobProps));
+      runJob(jobProps, jobListener, buildJobLauncher(jobProps));
     } catch (Exception e) {
       throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
     }
   }
 
+  public JobLauncher buildJobLauncher(Properties jobProps) throws Exception {
+    return JobLauncherFactory.newJobLauncher(this.properties, jobProps);
+  }
+
   /**
    * Run a job.
    *
@@ -441,9 +457,10 @@ public class JobScheduler extends AbstractIdleService {
    * @param jobProps Job configuration properties
    * @param jobListener {@link JobListener} used for callback, can be <em>null</em> if no callback is needed.
    * @param jobLauncher a {@link JobLauncher} object used to launch the job to run
+   * @return If current job needs retriggering
    * @throws JobException when there is anything wrong with running the job
    */
-  public void runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher)
+  public boolean runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher)
       throws JobException {
     Preconditions.checkArgument(jobProps.containsKey(ConfigurationKeys.JOB_NAME_KEY),
         "A job must have a job name specified by job.name");
@@ -453,16 +470,20 @@ public class JobScheduler extends AbstractIdleService {
     boolean disabled = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_DISABLED_KEY, "false"));
     if (disabled) {
       LOG.info("Skipping disabled job " + jobName);
-      return;
+      return false;
     }
 
     // Launch the job
     try (Closer closer = Closer.create()) {
       closer.register(jobLauncher).launchJob(jobListener);
       boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false"));
-      if (runOnce && this.scheduledJobs.containsKey(jobName)) {
+      boolean isEarlyStopped = jobLauncher.isEarlyStopped();
+      if (!isEarlyStopped && runOnce && this.scheduledJobs.containsKey(jobName)) {
         this.scheduler.getScheduler().deleteJob(this.scheduledJobs.remove(jobName));
       }
+
+      return isEarlyStopped;
+
     } catch (Throwable t) {
       throw new JobException("Failed to launch and run job " + jobName, t);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index 872fceb..b16a261 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -83,7 +83,6 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
   public static final String USE_ALL_OBJECTS = "use.all.objects";
   public static final boolean DEFAULT_USE_ALL_OBJECTS = false;
 
-  private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning";
   private static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing";
   private static final String DYNAMIC_PROBING_LIMIT = "salesforce.dynamicProbingLimit";
   private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000;
@@ -101,10 +100,15 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
   private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt FROM ${table} "
       + "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}";
 
+  private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning";
+  private static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit";
+  private static final long DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT = DEFAULT_MIN_TARGET_PARTITION_SIZE * 4;
+
   private static final String SECONDS_FORMAT = "yyyy-MM-dd-HH:mm:ss";
   private static final String ZERO_TIME_SUFFIX = "-00:00:00";
 
   private static final Gson GSON = new Gson();
+  private boolean isEarlyStopped = false;
 
   @VisibleForTesting
   SalesforceSource(LineageInfo lineageInfo) {
@@ -122,6 +126,11 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
   }
 
   @Override
+  public boolean isEarlyStopped() {
+    return isEarlyStopped;
+  }
+
+  @Override
   protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
     DatasetDescriptor source =
         new DatasetDescriptor(DatasetConstants.PLATFORM_SALESFORCE, entity.getSourceEntityName());
@@ -147,17 +156,56 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
       return super.generateWorkUnits(sourceEntity, state, previousWatermark);
     }
 
-    Partition partition = new Partitioner(state).getGlobalPartition(previousWatermark);
+    Partitioner partitioner = new Partitioner(state);
+    if (isEarlyStopEnabled(state) && partitioner.isFullDump()) {
+      throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode.");
+    }
+
+    Partition partition = partitioner.getGlobalPartition(previousWatermark);
     Histogram histogram = getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition);
 
-    String specifiedPartitions = generateSpecifiedPartitions(histogram, minTargetPartitionSize, maxPartitions,
-        partition.getLowWatermark(), partition.getHighWatermark());
+    // we should look if the count is too big, cut off early if count exceeds the limit, or bucket size is too large
+
+    Histogram histogramAdjust;
+
+    // TODO: we should consider move this logic into getRefinedHistogram so that we can early terminate the search
+    if (isEarlyStopEnabled(state)) {
+      histogramAdjust = new Histogram();
+      for (HistogramGroup group : histogram.getGroups()) {
+        histogramAdjust.add(group);
+        if (histogramAdjust.getTotalRecordCount() > state
+            .getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT)) {
+          break;
+        }
+      }
+    } else {
+      histogramAdjust = histogram;
+    }
+
+    long expectedHighWatermark = partition.getHighWatermark();
+    if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) {
+      HistogramGroup lastPlusOne = histogram.get(histogramAdjust.getGroups().size());
+      long earlyStopHighWatermark = Long.parseLong(Utils.toDateTimeFormat(lastPlusOne.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT));
+      log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark);
+      this.isEarlyStopped = true;
+      expectedHighWatermark = earlyStopHighWatermark;
+    } else {
+      log.info("Job {} will be finished in a single run. [LW : {}, expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), expectedHighWatermark);
+    }
+
+    String specifiedPartitions = generateSpecifiedPartitions(histogramAdjust, minTargetPartitionSize, maxPartitions,
+        partition.getLowWatermark(), expectedHighWatermark);
     state.setProp(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, true);
     state.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, specifiedPartitions);
+    state.setProp(Partitioner.IS_EARLY_STOPPED, isEarlyStopped);
 
     return super.generateWorkUnits(sourceEntity, state, previousWatermark);
   }
 
+  private boolean isEarlyStopEnabled (State state) {
+    return state.getPropAsBoolean(ConfigurationKeys.SOURCE_EARLY_STOP_ENABLED, ConfigurationKeys.DEFAULT_SOURCE_EARLY_STOP_ENABLED);
+  }
+
   String generateSpecifiedPartitions(Histogram histogram, int minTargetPartitionSize, int maxPartitions, long lowWatermark,
       long expectedHighWatermark) {
     int interval = computeTargetPartitionSize(histogram, minTargetPartitionSize, maxPartitions);
@@ -183,7 +231,12 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
         partitionPoints.add(Utils.toDateTimeFormat(group.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT));
       }
 
-      // Move the candidate to a new bucket if the attempted total is 2x of interval
+      /**
+       * Using greedy algorithm by keep adding group until it exceeds the interval size (x2)
+       * Proof: Assuming nth group violates 2 x interval size, then all groups from 0th to (n-1)th, plus nth group,
+       * will have total size larger or equal to interval x 2. Hence, we are saturating all intervals (with original size)
+       * without leaving any unused space in between. We could choose x3,x4... but it is not space efficient.
+       */
       if (count != 0 && count + group.count >= 2 * interval) {
         // Summarize current group
         statistics.addValue(count);
@@ -351,7 +404,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
 
     // make a copy of the histogram list and add a dummy entry at the end to avoid special processing of the last group
     List<HistogramGroup> list = new ArrayList(histogram.getGroups());
-    Date hwmDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT);
+    Date hwmDate = Utils.toDate(partition.getHighWatermark(), Partitioner.WATERMARKTIMEFORMAT);
     list.add(new HistogramGroup(Utils.epochToDate(hwmDate.getTime(), SECONDS_FORMAT), 0));
 
     for (int i = 0; i < list.size() - 1; i++) {
@@ -447,7 +500,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
 
     // exchange the first histogram group key with the global low watermark to ensure that the low watermark is captured
     // in the range of generated partitions
-    HistogramGroup firstGroup = histogram.getGroups().get(0);
+    HistogramGroup firstGroup = histogram.get(0);
     Date lwmDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT);
     histogram.getGroups().set(0, new HistogramGroup(Utils.epochToDate(lwmDate.getTime(), SECONDS_FORMAT),
         firstGroup.getCount()));
@@ -523,6 +576,10 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
       totalRecordCount += histogram.totalRecordCount;
     }
 
+    HistogramGroup get(int idx) {
+      return this.groups.get(idx);
+    }
+
     @Override
     public String toString() {
       return groups.toString();
@@ -555,7 +612,6 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
     } catch (RestApiProcessingException e) {
       throw Throwables.propagate(e);
     }
-
   }
 
   private static Set<SourceEntity> getSourceEntities(String response) {