You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/14 21:51:51 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-856] make job
status monitor a top level service
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fb0c72d [GOBBLIN-856] make job status monitor a top level service
fb0c72d is described below
commit fb0c72dc7adb3e97cdccc010b66c29646ac747f2
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed Aug 14 14:51:41 2019 -0700
[GOBBLIN-856] make job status monitor a top level service
Closes #2712 from
arjun4084346/jobstatusmonitorservice
---
.../java/org/apache/gobblin/service/ServiceConfigKeys.java | 1 +
.../gobblin/service/modules/core/GobblinServiceManager.java | 13 +++++++++++++
.../gobblin/service/modules/orchestration/DagManager.java | 9 ---------
.../gobblin/service/monitoring/KafkaJobStatusMonitor.java | 1 +
.../gobblin/service/modules/core/GobblinServiceHATest.java | 1 +
.../service/modules/core/GobblinServiceManagerTest.java | 2 ++
6 files changed, 18 insertions(+), 9 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index c984b54..e40d3de 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -33,6 +33,7 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
+ public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
// Helix / ServiceScheduler Keys
public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helix.cluster.name";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 23cbc5d..6c42d49 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -96,6 +96,8 @@ import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -130,6 +132,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
protected final boolean isTopologySpecFactoryEnabled;
protected final boolean isGitConfigMonitorEnabled;
protected final boolean isDagManagerEnabled;
+ protected final boolean isJobStatusMonitorEnabled;
protected TopologyCatalog topologyCatalog;
@Getter
@@ -155,6 +158,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
protected DagManager dagManager;
+ protected KafkaJobStatusMonitor jobStatusMonitor;
+
@Getter
protected Config config;
private final MetricContext metricContext;
@@ -226,6 +231,13 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
this.serviceLauncher.addService(this.dagManager);
}
+ this.isJobStatusMonitorEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true) ;
+ // Initialize JobStatusMonitor
+ if (this.isJobStatusMonitorEnabled) {
+ this.jobStatusMonitor = new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
+ this.serviceLauncher.addService(this.jobStatusMonitor);
+ }
+
// Initialize ServiceScheduler
this.isSchedulerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
@@ -389,6 +401,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
} else if (this.helixManager.isPresent()) {
LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
this.helixManager.get().isLeader());
+
if (this.isSchedulerEnabled) {
LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
this.scheduler.setActive(false);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a8c2d19..1ae4fb7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -162,7 +162,6 @@ public class DagManager extends AbstractIdleService {
private final Integer pollingInterval;
@Getter
private final JobStatusRetriever jobStatusRetriever;
- private final KafkaJobStatusMonitor jobStatusMonitor;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
@@ -184,7 +183,6 @@ public class DagManager extends AbstractIdleService {
}
try {
- this.jobStatusMonitor = createJobStatusMonitor(config);
this.jobStatusRetriever = createJobStatusRetriever(config);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Exception encountered during DagManager initialization", e);
@@ -312,10 +310,6 @@ public class DagManager extends AbstractIdleService {
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
}
- if ((this.jobStatusMonitor != null) && (!this.jobStatusMonitor.isRunning())) {
- log.info("Starting job status monitor");
- jobStatusMonitor.startAsync().awaitRunning();
- }
for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) {
addDag(dag);
}
@@ -327,8 +321,6 @@ public class DagManager extends AbstractIdleService {
} catch (InterruptedException e) {
log.error("Exception encountered when shutting down DagManager threads.", e);
}
- log.info("Shutting down JobStatusMonitor");
- this.jobStatusMonitor.shutDown();
}
} catch (IOException e) {
log.error("Exception encountered when activating the new DagManager", e);
@@ -815,6 +807,5 @@ public class DagManager extends AbstractIdleService {
throws Exception {
this.scheduledExecutorPool.shutdown();
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
- this.jobStatusMonitor.shutDown();
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c46e01e..8c8ea2a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -197,4 +197,5 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
}
public abstract org.apache.gobblin.configuration.State parseJobStatus(byte[] message) throws IOException;
+
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index a4a4497..5ae0b37 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -139,6 +139,7 @@ public class GobblinServiceHATest {
commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
commonServiceCoreProperties.put("zookeeper.connect", testingZKServer.getConnectString());
commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
+ commonServiceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
Properties node1ServiceCoreProperties = new Properties();
node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index d5105be..1d1736c 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -131,6 +131,8 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, JOB_STATUS_STATE_STORE_DIR);
+ serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
+
// Create a bare repository
RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
fileKey.open(false).create(true);