You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/07/29 02:27:24 UTC
[1/2] incubator-gobblin git commit: Allow GobblinHelixJobScheduler to
disable the services started by it's super class.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master d9d7d5f0c -> 467fe8fc8
Allow GobblinHelixJobScheduler to disable the services started by it's super class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/dc3d1227
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/dc3d1227
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/dc3d1227
Branch: refs/heads/master
Commit: dc3d122769ed6c3afcb8e9af6210f594b5fc7585
Parents: 0975312
Author: Joel Baranick <jo...@ensighten.com>
Authored: Mon May 1 09:58:37 2017 -0700
Committer: Joel Baranick <jo...@ensighten.com>
Committed: Fri Jul 28 12:46:09 2017 -0700
----------------------------------------------------------------------
.../cluster/GobblinHelixJobScheduler.java | 4 ++
.../java/gobblin/scheduler/JobScheduler.java | 48 ++++++++++++--------
2 files changed, 32 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc3d1227/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
index 05595bc..c598c72 100644
--- a/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -103,6 +103,10 @@ public class GobblinHelixJobScheduler extends JobScheduler {
}
@Override
+ protected void startServices() throws Exception {
+ }
+
+ @Override
public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
try {
JobLauncher jobLauncher = buildGobblinHelixJobLauncher(jobProps);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc3d1227/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
index 5eb1a6a..2736637 100644
--- a/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
@@ -17,9 +17,8 @@
package gobblin.scheduler;
+import java.io.Closeable;
import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -50,7 +49,6 @@ import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -131,6 +129,8 @@ public class JobScheduler extends AbstractIdleService {
// A period of time for scheduler to wait until jobs are finished
private final boolean waitForJobCompletion;
+ private final Closer closer = Closer.create();
+
public JobScheduler(Properties properties, SchedulerService scheduler)
throws Exception {
this.properties = properties;
@@ -155,14 +155,6 @@ public class JobScheduler extends AbstractIdleService {
this.properties.getProperty(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION));
- if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) &&
- !this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) {
- String path = FileSystems.getDefault()
- .getPath(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY))
- .normalize().toAbsolutePath().toString();
- this.properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, "file:///" + path);
- }
-
if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) {
this.jobConfigFileDirPath = new Path(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY));
this.listener = new PathAlterationListenerAdaptorForMonitor(jobConfigFileDirPath, this);
@@ -185,28 +177,34 @@ public class JobScheduler extends AbstractIdleService {
}
// Note: This should not be mandatory, gobblin-cluster modes have their own job configuration managers
- if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) {
- startGeneralJobConfigFileMonitor();
- scheduleGeneralConfiguredJobs();
+ if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)
+ || this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) {
+
+ if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) && !this.properties.containsKey(
+ ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) {
+ this.properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ "file://" + this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY));
+ }
+ startServices();
}
}
+ protected void startServices() throws Exception {
+ startGeneralJobConfigFileMonitor();
+ scheduleGeneralConfiguredJobs();
+ }
+
@Override
protected void shutDown()
throws Exception {
LOG.info("Stopping the job scheduler");
-
- if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY) || this.properties.containsKey(
- ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)) {
- this.pathAlterationDetector.stop(1000);
- }
+ closer.close();
List<JobExecutionContext> currentExecutions = this.scheduler.getScheduler().getCurrentlyExecutingJobs();
for (JobExecutionContext jobExecutionContext : currentExecutions) {
this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId());
}
-
ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG));
}
@@ -454,6 +452,16 @@ public class JobScheduler extends AbstractIdleService {
throws Exception {
SchedulerUtils.addPathAlterationObserver(this.pathAlterationDetector, this.listener, jobConfigFileDirPath);
this.pathAlterationDetector.start();
+ this.closer.register(new Closeable() {
+ @Override
+ public void close() throws IOException {
+ try {
+ pathAlterationDetector.stop(1000);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ });
}
/**
[2/2] incubator-gobblin git commit: Merge pull request #2026 from
kadaan/Fix_for_GOBBLIN-29
Posted by ab...@apache.org.
Merge pull request #2026 from kadaan/Fix_for_GOBBLIN-29
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/467fe8fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/467fe8fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/467fe8fc
Branch: refs/heads/master
Commit: 467fe8fc81242a0d669de57a2404202078096152
Parents: d9d7d5f dc3d122
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Fri Jul 28 19:26:58 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Fri Jul 28 19:26:58 2017 -0700
----------------------------------------------------------------------
.../cluster/GobblinHelixJobScheduler.java | 4 ++
.../java/gobblin/scheduler/JobScheduler.java | 48 ++++++++++++--------
2 files changed, 32 insertions(+), 20 deletions(-)
----------------------------------------------------------------------