You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/14 21:51:51 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-856] make job status monitor a top level service

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fb0c72d  [GOBBLIN-856] make job status monitor a top level service
fb0c72d is described below

commit fb0c72dc7adb3e97cdccc010b66c29646ac747f2
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed Aug 14 14:51:41 2019 -0700

    [GOBBLIN-856] make job status monitor a top level service
    
    Closes #2712 from
    arjun4084346/jobstatusmonitorservice
---
 .../java/org/apache/gobblin/service/ServiceConfigKeys.java  |  1 +
 .../gobblin/service/modules/core/GobblinServiceManager.java | 13 +++++++++++++
 .../gobblin/service/modules/orchestration/DagManager.java   |  9 ---------
 .../gobblin/service/monitoring/KafkaJobStatusMonitor.java   |  1 +
 .../gobblin/service/modules/core/GobblinServiceHATest.java  |  1 +
 .../service/modules/core/GobblinServiceManagerTest.java     |  2 ++
 6 files changed, 18 insertions(+), 9 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index c984b54..e40d3de 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -33,6 +33,7 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
   public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
   public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
+  public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
 
   // Helix / ServiceScheduler Keys
   public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helix.cluster.name";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 23cbc5d..6c42d49 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -96,6 +96,8 @@ import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -130,6 +132,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
   protected final boolean isTopologySpecFactoryEnabled;
   protected final boolean isGitConfigMonitorEnabled;
   protected final boolean isDagManagerEnabled;
+  protected final boolean isJobStatusMonitorEnabled;
 
   protected TopologyCatalog topologyCatalog;
   @Getter
@@ -155,6 +158,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
 
   protected DagManager dagManager;
 
+  protected KafkaJobStatusMonitor jobStatusMonitor;
+
   @Getter
   protected Config config;
   private final MetricContext metricContext;
@@ -226,6 +231,13 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
       this.serviceLauncher.addService(this.dagManager);
     }
 
+    this.isJobStatusMonitorEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true) ;
+    // Initialize JobStatusMonitor
+    if (this.isJobStatusMonitorEnabled) {
+      this.jobStatusMonitor = new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
+      this.serviceLauncher.addService(this.jobStatusMonitor);
+    }
+
     // Initialize ServiceScheduler
     this.isSchedulerEnabled = ConfigUtils.getBoolean(config,
         ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
@@ -389,6 +401,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     } else if (this.helixManager.isPresent()) {
       LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
           this.helixManager.get().isLeader());
+
       if (this.isSchedulerEnabled) {
         LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
         this.scheduler.setActive(false);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a8c2d19..1ae4fb7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -162,7 +162,6 @@ public class DagManager extends AbstractIdleService {
   private final Integer pollingInterval;
   @Getter
   private final JobStatusRetriever jobStatusRetriever;
-  private final KafkaJobStatusMonitor jobStatusMonitor;
   private final Config config;
   private final Optional<EventSubmitter> eventSubmitter;
 
@@ -184,7 +183,6 @@ public class DagManager extends AbstractIdleService {
     }
 
     try {
-      this.jobStatusMonitor = createJobStatusMonitor(config);
       this.jobStatusRetriever = createJobStatusRetriever(config);
     } catch (ReflectiveOperationException e) {
       throw new RuntimeException("Exception encountered during DagManager initialization", e);
@@ -312,10 +310,6 @@ public class DagManager extends AbstractIdleService {
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
         }
-        if ((this.jobStatusMonitor != null) && (!this.jobStatusMonitor.isRunning())) {
-          log.info("Starting job status monitor");
-          jobStatusMonitor.startAsync().awaitRunning();
-        }
         for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) {
           addDag(dag);
         }
@@ -327,8 +321,6 @@ public class DagManager extends AbstractIdleService {
         } catch (InterruptedException e) {
           log.error("Exception encountered when shutting down DagManager threads.", e);
         }
-        log.info("Shutting down JobStatusMonitor");
-        this.jobStatusMonitor.shutDown();
       }
     } catch (IOException e) {
       log.error("Exception encountered when activating the new DagManager", e);
@@ -815,6 +807,5 @@ public class DagManager extends AbstractIdleService {
       throws Exception {
     this.scheduledExecutorPool.shutdown();
     this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
-    this.jobStatusMonitor.shutDown();
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c46e01e..8c8ea2a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -197,4 +197,5 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   }
 
   public abstract org.apache.gobblin.configuration.State parseJobStatus(byte[] message) throws IOException;
+
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index a4a4497..5ae0b37 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -139,6 +139,7 @@ public class GobblinServiceHATest {
     commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
     commonServiceCoreProperties.put("zookeeper.connect", testingZKServer.getConnectString());
     commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
+    commonServiceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
 
     Properties node1ServiceCoreProperties = new Properties();
     node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index d5105be..1d1736c 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -131,6 +131,8 @@ public class GobblinServiceManagerTest {
 
     serviceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, JOB_STATUS_STATE_STORE_DIR);
 
+    serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
+
     // Create a bare repository
     RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
     fileKey.open(false).create(true);