You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/22 23:48:06 UTC
incubator-gobblin git commit: [GOBBLIN-423] Add record count limit to
salesforce source
Repository: incubator-gobblin
Updated Branches:
refs/heads/master d323f6022 -> 5b8f7dfb1
[GOBBLIN-423] Add record count limit to salesforce source
Closes #2300 from yukuai518/limit
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5b8f7dfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5b8f7dfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5b8f7dfb
Branch: refs/heads/master
Commit: 5b8f7dfb10fa8eddaedfbb13df6adcaf018acf65
Parents: d323f60
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Mar 22 16:47:59 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Mar 22 16:47:59 2018 -0700
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 7 +-
.../java/org/apache/gobblin/source/Source.java | 9 ++
.../apache/gobblin/cluster/GobblinHelixJob.java | 16 +--
.../cluster/GobblinHelixJobScheduler.java | 119 +++++++++++++++----
.../extractor/extract/AbstractSource.java | 5 +
.../source/extractor/extract/ExtractType.java | 8 +-
.../extractor/extract/QueryBasedSource.java | 2 +-
.../source/extractor/partition/Partitioner.java | 28 +++--
.../source/extractor/watermark/Watermark.java | 12 +-
.../gobblin/runtime/AbstractJobLauncher.java | 4 +
.../org/apache/gobblin/runtime/JobLauncher.java | 4 +
.../apache/gobblin/runtime/SourceDecorator.java | 5 +
.../apache/gobblin/scheduler/JobScheduler.java | 39 ++++--
.../gobblin/salesforce/SalesforceSource.java | 72 +++++++++--
14 files changed, 260 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 612fd8b..70459a2 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -88,6 +88,10 @@ public class ConfigurationKeys {
/**
* Job scheduler configuration properties.
*/
+ // Job retriggering
+ public static final String JOB_RETRIGGERING_ENABLED = "job.retriggering.enabled";
+ public static final String DEFAULT_JOB_RETRIGGERING_ENABLED = "true";
+
// Job executor thread pool size
public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size";
public static final int DEFAULT_JOB_EXECUTOR_THREAD_POOL_SIZE = 5;
@@ -473,6 +477,8 @@ public class ConfigurationKeys {
public static final String SOURCE_MAX_NUMBER_OF_PARTITIONS = "source.max.number.of.partitions";
public static final String SOURCE_SKIP_FIRST_RECORD = "source.skip.first.record";
public static final String SOURCE_COLUMN_NAME_CASE = "source.column.name.case";
+ public static final String SOURCE_EARLY_STOP_ENABLED = "source.earlyStop.enabled";
+ public static final boolean DEFAULT_SOURCE_EARLY_STOP_ENABLED = false;
/**
* Configuration properties used by the QueryBasedExtractor.
@@ -583,7 +589,6 @@ public class ConfigurationKeys {
public static final String DEFAULT_SOURCE_QUERYBASED_IS_METADATA_COLUMN_CHECK_ENABLED = "true";
public static final String DEFAULT_COLUMN_NAME_CASE = "NOCHANGE";
public static final int DEFAULT_SOURCE_QUERYBASED_JDBC_RESULTSET_FETCH_SIZE = 1000;
-
public static final String FILEBASED_REPORT_STATUS_ON_COUNT = "filebased.report.status.on.count";
public static final int DEFAULT_FILEBASED_REPORT_STATUS_ON_COUNT = 10000;
public static final String DEFAULT_SOURCE_TIMEZONE = PST_TIMEZONE_NAME;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java b/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java
index 1edfd8d..37e73c8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/Source.java
@@ -94,4 +94,13 @@ public interface Source<S, D> {
* @param state see {@link SourceState}
*/
public abstract void shutdown(SourceState state);
+
+ /**
+ * Instead of handling all {@link WorkUnit}s in one run, some {@link Source} may choose to stop early in order to handle the
+ * proper workload, which can cause multiple runs after the initial run.
+ * @return If the same job has early stopped
+ */
+ public default boolean isEarlyStopped() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
index b8fd2f4..bc9f88f 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
@@ -17,15 +17,11 @@
package org.apache.gobblin.cluster;
-import java.util.List;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.Path;
-import org.apache.helix.HelixManager;
import org.quartz.InterruptableJob;
import org.quartz.Job;
@@ -34,8 +30,6 @@ import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.scheduler.BaseGobblinJob;
import org.apache.gobblin.scheduler.JobScheduler;
@@ -56,24 +50,18 @@ public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob
@Override
public void executeImpl(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- ConcurrentHashMap runningMap = (ConcurrentHashMap)dataMap.get(GobblinHelixJobScheduler.JOB_RUNNING_MAP);
final JobScheduler jobScheduler = (JobScheduler) dataMap.get(JobScheduler.JOB_SCHEDULER_KEY);
// the properties may get mutated during job execution and the scheduler reuses it for the next round of scheduling,
// so clone it
final Properties jobProps = (Properties)((Properties) dataMap.get(JobScheduler.PROPERTIES_KEY)).clone();
final JobListener jobListener = (JobListener) dataMap.get(JobScheduler.JOB_LISTENER_KEY);
- HelixManager helixManager = (HelixManager) dataMap.get(GobblinHelixJobScheduler.HELIX_MANAGER_KEY);
- Path appWorkDir = (Path) dataMap.get(GobblinHelixJobScheduler.APPLICATION_WORK_DIR_KEY);
- @SuppressWarnings("unchecked")
- List<? extends Tag<?>> eventMetadata = (List<? extends Tag<?>>) dataMap.get(GobblinHelixJobScheduler.METADATA_TAGS);
try {
- final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager, appWorkDir, eventMetadata, runningMap);
if (Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD,
Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) {
- jobScheduler.runJob(jobProps, jobListener, jobLauncher);
+ jobScheduler.runJob(jobProps, jobListener);
} else {
- cancellable = jobScheduler.scheduleJobImmediately(jobProps, jobListener, jobLauncher);
+ cancellable = jobScheduler.scheduleJobImmediately(jobProps, jobListener);
}
} catch (Throwable t) {
throw new JobExecutionException(t);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 48b12f2..ef12162 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -19,23 +19,25 @@ package org.apache.gobblin.cluster;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
@@ -47,8 +49,6 @@ import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareMetric;
-import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
@@ -66,6 +66,7 @@ import org.apache.gobblin.scheduler.SchedulerService;
import javax.annotation.Nonnull;
+import lombok.Getter;
/**
@@ -79,11 +80,6 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobScheduler.class);
- static final String HELIX_MANAGER_KEY = "helixManager";
- static final String APPLICATION_WORK_DIR_KEY = "applicationWorkDir";
- static final String METADATA_TAGS = "metadataTags";
- static final String JOB_RUNNING_MAP = "jobRunningMap";
-
private final Properties properties;
private final HelixManager helixManager;
private final EventBus eventBus;
@@ -262,13 +258,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
@Override
public void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
- Map<String, Object> additionalJobDataMap = Maps.newHashMap();
- additionalJobDataMap.put(HELIX_MANAGER_KEY, this.helixManager);
- additionalJobDataMap.put(APPLICATION_WORK_DIR_KEY, this.appWorkDir);
- additionalJobDataMap.put(METADATA_TAGS, this.metadataTags);
- additionalJobDataMap.put(JOB_RUNNING_MAP, this.jobRunningMap);
try {
- scheduleJob(jobProps, jobListener, additionalJobDataMap, GobblinHelixJob.class);
+ scheduleJob(jobProps, jobListener, Maps.newHashMap(), GobblinHelixJob.class);
} catch (Exception e) {
throw new JobException("Failed to schedule job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
}
@@ -280,19 +271,101 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
- try {
- JobLauncher jobLauncher = buildGobblinHelixJobLauncher(jobProps);
- runJob(jobProps, jobListener, jobLauncher);
- } catch (Exception e) {
- throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
- }
+ new RetriggeringJobCallable(jobProps, jobListener).call();
}
- private GobblinHelixJobLauncher buildGobblinHelixJobLauncher(Properties jobProps)
+ @Override
+ public GobblinHelixJobLauncher buildJobLauncher(Properties jobProps)
throws Exception {
return new GobblinHelixJobLauncher(jobProps, this.helixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap);
}
+ private class RetriggeringJobCallable implements Callable {
+ Properties jobProps;
+ JobListener jobListener;
+
+ public RetriggeringJobCallable(Properties jobProps, JobListener jobListener) {
+ this.jobProps = jobProps;
+ this.jobListener = jobListener;
+ }
+
+ private boolean isRetriggeringEnabled() {
+ return PropertiesUtils.getPropAsBoolean(jobProps, ConfigurationKeys.JOB_RETRIGGERING_ENABLED, ConfigurationKeys.DEFAULT_JOB_RETRIGGERING_ENABLED);
+ }
+
+ @Getter
+ JobLauncher currentJobLauncher = null;
+
+ @Override
+ public Void call() throws JobException {
+ try {
+ while (true) {
+ currentJobLauncher = buildJobLauncher(jobProps);
+ boolean isEarlyStopped = runJob(jobProps, jobListener, currentJobLauncher);
+ boolean isRetriggerEnabled = this.isRetriggeringEnabled();
+ if (isEarlyStopped && isRetriggerEnabled) {
+ LOGGER.info("Job {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ } else {
+ break;
+ }
+ currentJobLauncher = null;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+ throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+ }
+
+ return null;
+ }
+ }
+
+ public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) {
+ RetriggeringJobCallable retriggeringJob = new RetriggeringJobCallable(jobProps, jobListener);
+ final Future<?> future = this.jobExecutor.submit(retriggeringJob);
+ return new Future() {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (!GobblinHelixJobScheduler.this.isCancelRequested()) {
+ return false;
+ }
+ boolean result = true;
+ try {
+ JobLauncher jobLauncher = retriggeringJob.getCurrentJobLauncher();
+ if (jobLauncher != null) {
+ jobLauncher.cancelJob(jobListener);
+ }
+ } catch (JobException e) {
+ LOGGER.error("Failed to cancel job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+ result = false;
+ }
+ if (mayInterruptIfRunning) {
+ result &= future.cancel(true);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return future.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return future.isDone();
+ }
+
+ @Override
+ public Object get() throws InterruptedException, ExecutionException {
+ return future.get();
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return future.get(timeout, unit);
+ }
+ };
+ }
+
@Subscribe
public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
LOGGER.info("Received new job configuration of job " + newJobArrival.getJobName());
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java
index db51597..d7fd05e 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/AbstractSource.java
@@ -139,4 +139,9 @@ public abstract class AbstractSource<S, D> implements Source<S, D> {
public Extract createExtract(TableType type, String namespace, String table) {
return this.extractFactory.getUniqueExtract(type, namespace, table);
}
+
+ @Override
+ public boolean isEarlyStopped() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java
index a8ae7bf..49cfae9 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/ExtractType.java
@@ -17,6 +17,12 @@
package org.apache.gobblin.source.extractor.extract;
+/**
+ * Different extract types
+ */
public enum ExtractType {
- SNAPSHOT, APPEND_DAILY, APPEND_HOURLY, APPEND_BATCH
+ SNAPSHOT, // Used iff user wants highwatermark to be set to latest.
+ APPEND_DAILY, // Used iff user wants highwatermark to be set to a fixed point, like CURRENTDATE - <backoff days>.
+ APPEND_HOURLY, // Used iff user wants highwatermark to be set to a fixed point, like CURRENTHOUR - <backoff hours>.
+ APPEND_BATCH // <Please document this>
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
index 29c98d9..5d5330d 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
@@ -198,7 +198,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> {
combinedState.setProp(ConfigurationKeys.SOURCE_QUERYBASED_END_VALUE, previousWatermark);
}
- workUnits.addAll(generateWorkUnits(sourceEntity, state, previousWatermark));
+ workUnits.addAll(generateWorkUnits(sourceEntity, combinedState, previousWatermark));
}
log.info("Total number of workunits for the current run: " + workUnits.size());
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
index 90da5b5..052d6b4 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
@@ -53,6 +53,7 @@ public class Partitioner {
public static final String WATERMARKTIMEFORMAT = "yyyyMMddHHmmss";
public static final String HAS_USER_SPECIFIED_PARTITIONS = "partitioner.hasUserSpecifiedPartitions";
public static final String USER_SPECIFIED_PARTITIONS = "partitioner.userSpecifiedPartitions";
+ public static final String IS_EARLY_STOPPED = "partitioner.isEarlyStopped";
public static final Comparator<Partition> ascendingComparator = new Comparator<Partition>() {
@Override
@@ -109,7 +110,9 @@ public class Partitioner {
* Get partitions with low and high water marks
*
* @param previousWatermark previous water mark from metadata
- * @return map of partition intervals
+ * @return map of partition intervals.
+ * map's key is interval begin time (in format {@link Partitioner#WATERMARKTIMEFORMAT})
+ * map's value is interval end time (in format {@link Partitioner#WATERMARKTIMEFORMAT})
*/
@Deprecated
public HashMap<Long, Long> getPartitions(long previousWatermark) {
@@ -196,9 +199,12 @@ public class Partitioner {
* Generate the partitions based on the lists specified by the user in job config
*/
private List<Partition> createUserSpecifiedPartitions() {
+
List<Partition> partitions = new ArrayList<>();
List<String> watermarkPoints = state.getPropAsList(USER_SPECIFIED_PARTITIONS);
+ boolean isEarlyStopped = state.getPropAsBoolean(IS_EARLY_STOPPED);
+
if (watermarkPoints == null || watermarkPoints.size() == 0 ) {
LOG.info("There should be some partition points");
long defaultWatermark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
@@ -234,7 +240,9 @@ public class Partitioner {
highWatermark = adjustWatermark(watermarkPoints.get(i), watermarkType);
ExtractType extractType =
ExtractType.valueOf(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_EXTRACT_TYPE).toUpperCase());
- if (isFullDump() || isSnapshot(extractType)) {
+
+ // If it is early stop, we should not remove upper bounds
+ if ((isFullDump() || isSnapshot(extractType)) && !isEarlyStopped) {
// The upper bounds can be removed for last work unit
partitions.add(new Partition(lowWatermark, highWatermark, true, false));
} else {
@@ -242,6 +250,7 @@ public class Partitioner {
partitions.add(new Partition(lowWatermark, highWatermark, true, true));
}
+
return partitions;
}
@@ -291,18 +300,21 @@ public class Partitioner {
}
/**
- * Get low water mark
+ * Get low water mark:
+ * (1) Use {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff it is a full dump (or watermark override is enabled)
+ * (2) Otherwise use previous watermark (fallback to {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff previous watermark is unavailable)
*
* @param extractType Extract type
* @param watermarkType Watermark type
* @param previousWatermark Previous water mark
* @param deltaForNextWatermark delta number for next water mark
- * @return low water mark
+ * @return low water mark in {@link Partitioner#WATERMARKTIMEFORMAT}
*/
@VisibleForTesting
protected long getLowWatermark(ExtractType extractType, WatermarkType watermarkType, long previousWatermark,
int deltaForNextWatermark) {
long lowWatermark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
+
if (this.isFullDump() || this.isWatermarkOverride()) {
String timeZone =
this.state.getProp(ConfigurationKeys.SOURCE_TIMEZONE, ConfigurationKeys.DEFAULT_SOURCE_TIMEZONE);
@@ -331,7 +343,7 @@ public class Partitioner {
* @param watermarkType Watermark type
* @param previousWatermark Previous water mark
* @param deltaForNextWatermark delta number for next water mark
- * @return snapshot low water mark
+ * @return Previous watermark (fallback to {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff previous watermark is unavailable)
*/
private long getSnapshotLowWatermark(WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) {
LOG.debug("Getting snapshot low water mark");
@@ -362,7 +374,7 @@ public class Partitioner {
* @param watermarkType Watermark type
* @param previousWatermark Previous water mark
* @param deltaForNextWatermark delta number for next water mark
- * @return append low water mark
+ * @return Previous watermark (fallback to {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE} iff previous watermark is unavailable)
*/
private long getAppendLowWatermark(WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) {
LOG.debug("Getting append low water mark");
@@ -384,7 +396,7 @@ public class Partitioner {
*
* @param extractType Extract type
* @param watermarkType Watermark type
- * @return high water mark
+ * @return high water mark in {@link Partitioner#WATERMARKTIMEFORMAT}
*/
@VisibleForTesting
protected long getHighWatermark(ExtractType extractType, WatermarkType watermarkType) {
@@ -598,6 +610,8 @@ public class Partitioner {
}
/**
+ * If full dump is true, the low watermark will be based on {@link ConfigurationKeys#SOURCE_QUERYBASED_START_VALUE}
+ * Otherwise it will base on the previous watermark. Please refer to {@link Partitioner#getLowWatermark(ExtractType, WatermarkType, long, int)}
* @return full dump or not
*/
public boolean isFullDump() {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java
index e83fc65..53827b4 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/Watermark.java
@@ -26,8 +26,8 @@ public interface Watermark {
* Condition statement with the water mark value using the operator. Example (last_updated_ts >= 2013-01-01 00:00:00
*
* @param extractor
- * @param water mark value
- * @param relational operator between water mark column and value
+ * @param watermarkValue mark value
+ * @param operator operator between water mark column and value
* @return condition statement
*/
public String getWatermarkCondition(QueryBasedExtractor<?, ?> extractor, long watermarkValue, String operator);
@@ -35,10 +35,10 @@ public interface Watermark {
/**
* Get partitions for the given range
*
- * @param low water mark value
- * @param high water mark value
- * @param partition interval(in hours or days)
- * @param maximum number of partitions
+ * @param lowWatermarkValue water mark value
+ * @param highWatermarkValue water mark value
+ * @param partitionInterval interval(in hours or days)
+ * @param maxIntervals number of partitions
* @return partitions
*/
public HashMap<Long, Long> getIntervals(long lowWatermarkValue, long highWatermarkValue, long partitionInterval,
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index da770af..0c50c32 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -887,6 +887,10 @@ public abstract class AbstractJobLauncher implements JobLauncher {
}
}
+ public boolean isEarlyStopped() {
+ return this.jobContext.getSource().isEarlyStopped();
+ }
+
/**
* Staging data cannot be cleaned if exactly once semantics is used, and the job has unfinished
* commit sequences.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java
index 75c558f..52284b7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncher.java
@@ -67,4 +67,8 @@ public interface JobLauncher extends Closeable {
*/
public void cancelJob(@Nullable JobListener jobListener)
throws JobException;
+
+ public default boolean isEarlyStopped() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
index 9471444..4558325 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
@@ -116,4 +116,9 @@ public class SourceDecorator<S, D> implements WorkUnitStreamSource<S, D>, Decora
public Object getDecoratedObject() {
return this.source;
}
+
+ @Override
+ public boolean isEarlyStopped() {
+ return this.source.isEarlyStopped();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index 7ef6d85..f2e254d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -24,14 +24,13 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.hadoop.fs.Path;
@@ -63,6 +62,7 @@ import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.AbstractIdleService;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -135,6 +135,7 @@ public class JobScheduler extends AbstractIdleService {
private final Closer closer = Closer.create();
+ @Getter
private volatile boolean cancelRequested = false;
public JobScheduler(Properties properties, SchedulerService scheduler)
@@ -254,17 +255,20 @@ public class JobScheduler extends AbstractIdleService {
* with scheduling the job
*/
public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher) {
- Runnable runnable = new Runnable() {
+ Callable<Void> callable = new Callable<Void>() {
@Override
- public void run() {
+ public Void call() throws JobException {
try {
runJob(jobProps, jobListener, jobLauncher);
} catch (JobException je) {
LOG.error("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+ throw je;
}
+ return null;
}
};
- final Future<?> future = this.jobExecutor.submit(runnable);
+
+ final Future<?> future = this.jobExecutor.submit(callable);
return new Future() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
@@ -306,6 +310,14 @@ public class JobScheduler extends AbstractIdleService {
};
}
+ public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) throws JobException {
+ try {
+ return scheduleJobImmediately(jobProps, jobListener, buildJobLauncher(jobProps));
+ } catch (Exception e) {
+ throw new JobException("Job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + " cannot be immediately scheduled.", e);
+ }
+ }
+
/**
* Submit a runnable to the {@link ExecutorService} of this {@link JobScheduler}.
* @param runnable the runnable to submit to the job executor
@@ -419,12 +431,16 @@ public class JobScheduler extends AbstractIdleService {
public void runJob(Properties jobProps, JobListener jobListener)
throws JobException {
try {
- runJob(jobProps, jobListener, JobLauncherFactory.newJobLauncher(this.properties, jobProps));
+ runJob(jobProps, jobListener, buildJobLauncher(jobProps));
} catch (Exception e) {
throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
}
}
+ public JobLauncher buildJobLauncher(Properties jobProps) throws Exception {
+ return JobLauncherFactory.newJobLauncher(this.properties, jobProps);
+ }
+
/**
* Run a job.
*
@@ -441,9 +457,10 @@ public class JobScheduler extends AbstractIdleService {
* @param jobProps Job configuration properties
* @param jobListener {@link JobListener} used for callback, can be <em>null</em> if no callback is needed.
* @param jobLauncher a {@link JobLauncher} object used to launch the job to run
+ * @return If current job needs retriggering
* @throws JobException when there is anything wrong with running the job
*/
- public void runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher)
+ public boolean runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher)
throws JobException {
Preconditions.checkArgument(jobProps.containsKey(ConfigurationKeys.JOB_NAME_KEY),
"A job must have a job name specified by job.name");
@@ -453,16 +470,20 @@ public class JobScheduler extends AbstractIdleService {
boolean disabled = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_DISABLED_KEY, "false"));
if (disabled) {
LOG.info("Skipping disabled job " + jobName);
- return;
+ return false;
}
// Launch the job
try (Closer closer = Closer.create()) {
closer.register(jobLauncher).launchJob(jobListener);
boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false"));
- if (runOnce && this.scheduledJobs.containsKey(jobName)) {
+ boolean isEarlyStopped = jobLauncher.isEarlyStopped();
+ if (!isEarlyStopped && runOnce && this.scheduledJobs.containsKey(jobName)) {
this.scheduler.getScheduler().deleteJob(this.scheduledJobs.remove(jobName));
}
+
+ return isEarlyStopped;
+
} catch (Throwable t) {
throw new JobException("Failed to launch and run job " + jobName, t);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5b8f7dfb/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index 872fceb..b16a261 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -83,7 +83,6 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
public static final String USE_ALL_OBJECTS = "use.all.objects";
public static final boolean DEFAULT_USE_ALL_OBJECTS = false;
- private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning";
private static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing";
private static final String DYNAMIC_PROBING_LIMIT = "salesforce.dynamicProbingLimit";
private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000;
@@ -101,10 +100,15 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt FROM ${table} "
+ "WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}";
+ private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning";
+ private static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit";
+ private static final long DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT = DEFAULT_MIN_TARGET_PARTITION_SIZE * 4;
+
private static final String SECONDS_FORMAT = "yyyy-MM-dd-HH:mm:ss";
private static final String ZERO_TIME_SUFFIX = "-00:00:00";
private static final Gson GSON = new Gson();
+ private boolean isEarlyStopped = false;
@VisibleForTesting
SalesforceSource(LineageInfo lineageInfo) {
@@ -122,6 +126,11 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
}
@Override
+ public boolean isEarlyStopped() {
+ return isEarlyStopped;
+ }
+
+ @Override
protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
DatasetDescriptor source =
new DatasetDescriptor(DatasetConstants.PLATFORM_SALESFORCE, entity.getSourceEntityName());
@@ -147,17 +156,56 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
return super.generateWorkUnits(sourceEntity, state, previousWatermark);
}
- Partition partition = new Partitioner(state).getGlobalPartition(previousWatermark);
+ Partitioner partitioner = new Partitioner(state);
+ if (isEarlyStopEnabled(state) && partitioner.isFullDump()) {
+ throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode.");
+ }
+
+ Partition partition = partitioner.getGlobalPartition(previousWatermark);
Histogram histogram = getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition);
- String specifiedPartitions = generateSpecifiedPartitions(histogram, minTargetPartitionSize, maxPartitions,
- partition.getLowWatermark(), partition.getHighWatermark());
+ // we should look if the count is too big, cut off early if count exceeds the limit, or bucket size is too large
+
+ Histogram histogramAdjust;
+
+ // TODO: we should consider move this logic into getRefinedHistogram so that we can early terminate the search
+ if (isEarlyStopEnabled(state)) {
+ histogramAdjust = new Histogram();
+ for (HistogramGroup group : histogram.getGroups()) {
+ histogramAdjust.add(group);
+ if (histogramAdjust.getTotalRecordCount() > state
+ .getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT)) {
+ break;
+ }
+ }
+ } else {
+ histogramAdjust = histogram;
+ }
+
+ long expectedHighWatermark = partition.getHighWatermark();
+ if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) {
+ HistogramGroup lastPlusOne = histogram.get(histogramAdjust.getGroups().size());
+ long earlyStopHighWatermark = Long.parseLong(Utils.toDateTimeFormat(lastPlusOne.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT));
+ log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark);
+ this.isEarlyStopped = true;
+ expectedHighWatermark = earlyStopHighWatermark;
+ } else {
+ log.info("Job {} will be finished in a single run. [LW : {}, expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), expectedHighWatermark);
+ }
+
+ String specifiedPartitions = generateSpecifiedPartitions(histogramAdjust, minTargetPartitionSize, maxPartitions,
+ partition.getLowWatermark(), expectedHighWatermark);
state.setProp(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, true);
state.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, specifiedPartitions);
+ state.setProp(Partitioner.IS_EARLY_STOPPED, isEarlyStopped);
return super.generateWorkUnits(sourceEntity, state, previousWatermark);
}
+ private boolean isEarlyStopEnabled (State state) {
+ return state.getPropAsBoolean(ConfigurationKeys.SOURCE_EARLY_STOP_ENABLED, ConfigurationKeys.DEFAULT_SOURCE_EARLY_STOP_ENABLED);
+ }
+
String generateSpecifiedPartitions(Histogram histogram, int minTargetPartitionSize, int maxPartitions, long lowWatermark,
long expectedHighWatermark) {
int interval = computeTargetPartitionSize(histogram, minTargetPartitionSize, maxPartitions);
@@ -183,7 +231,12 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
partitionPoints.add(Utils.toDateTimeFormat(group.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT));
}
- // Move the candidate to a new bucket if the attempted total is 2x of interval
+ /**
+ * Using greedy algorithm by keep adding group until it exceeds the interval size (x2)
+ * Proof: Assuming nth group violates 2 x interval size, then all groups from 0th to (n-1)th, plus nth group,
+ * will have total size larger or equal to interval x 2. Hence, we are saturating all intervals (with original size)
+ * without leaving any unused space in between. We could choose x3,x4... but it is not space efficient.
+ */
if (count != 0 && count + group.count >= 2 * interval) {
// Summarize current group
statistics.addValue(count);
@@ -351,7 +404,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
// make a copy of the histogram list and add a dummy entry at the end to avoid special processing of the last group
List<HistogramGroup> list = new ArrayList(histogram.getGroups());
- Date hwmDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT);
+ Date hwmDate = Utils.toDate(partition.getHighWatermark(), Partitioner.WATERMARKTIMEFORMAT);
list.add(new HistogramGroup(Utils.epochToDate(hwmDate.getTime(), SECONDS_FORMAT), 0));
for (int i = 0; i < list.size() - 1; i++) {
@@ -447,7 +500,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
// exchange the first histogram group key with the global low watermark to ensure that the low watermark is captured
// in the range of generated partitions
- HistogramGroup firstGroup = histogram.getGroups().get(0);
+ HistogramGroup firstGroup = histogram.get(0);
Date lwmDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT);
histogram.getGroups().set(0, new HistogramGroup(Utils.epochToDate(lwmDate.getTime(), SECONDS_FORMAT),
firstGroup.getCount()));
@@ -523,6 +576,10 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
totalRecordCount += histogram.totalRecordCount;
}
+ HistogramGroup get(int idx) {
+ return this.groups.get(idx);
+ }
+
@Override
public String toString() {
return groups.toString();
@@ -555,7 +612,6 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
} catch (RestApiProcessingException e) {
throw Throwables.propagate(e);
}
-
}
private static Set<SourceEntity> getSourceEntities(String response) {