You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/01 18:29:21 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1162] Provide an option to allow slow containers to commit su…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cd38ac  [GOBBLIN-1162] Provide an option to allow slow containers to commit su…
5cd38ac is described below

commit 5cd38ac8d26dd4bccd8582a7182893f29ab5a009
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Jun 1 11:29:14 2020 -0700

    [GOBBLIN-1162] Provide an option to allow slow containers to commit su…
    
    Closes #3002 from sv2000/containerSuicide
---
 .../cluster/ContainerHealthCheckException.java     |  36 +++++
 .../cluster/GobblinClusterConfigurationKeys.java   |   6 +
 .../apache/gobblin/cluster/GobblinTaskRunner.java  |  73 ++++++++-
 .../cluster/HelixAssignedParticipantCheck.java     |   5 +-
 .../gobblin/cluster/GobblinTaskRunnerTest.java     |  54 ++++++-
 .../reporter/KafkaKeyValueProducerPusherTest.java  |   4 +-
 .../metrics/reporter/KafkaProducerPusherTest.java  |   8 +-
 .../gobblin/runtime/HighLevelConsumerTest.java     |   5 +-
 .../extract/kafka/KafkaExtractorStatsTracker.java  |  72 ++++++++-
 .../extract/kafka/KafkaIngestionHealthCheck.java   | 174 +++++++++++++++++++++
 .../kafka/KafkaExtractorStatsTrackerTest.java      |  32 +++-
 .../kafka/KafkaIngestionHealthCheckTest.java       | 155 ++++++++++++++++++
 .../gobblin/service/GobblinServiceManagerTest.java |  19 ++-
 .../event/ContainerHealthCheckFailureEvent.java    |  55 +++++++
 .../gobblin/util/eventbus/EventBusFactory.java     |  67 ++++++++
 .../apache/gobblin/util/eventbus/EventBusKey.java  |  42 +++++
 .../gobblin/util/eventbus/EventBusFactoryTest.java |  65 ++++++++
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java |   9 +-
 travis/test-groups.inc                             |   2 +-
 19 files changed, 854 insertions(+), 29 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthCheckException.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthCheckException.java
new file mode 100644
index 0000000..443bde6
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthCheckException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+/**
+ * Signals that the container has failed one or more health checks. In other words, the container has been detected
+ * itself to be in an unhealthy state. The application may want to catch this exception to take an appropriate
+ * action e.g. exiting with an appropriate exit code.
+ */
+public class ContainerHealthCheckException extends RuntimeException {
+  public ContainerHealthCheckException() {
+    super();
+  }
+
+  public ContainerHealthCheckException(String message) {
+    super(message);
+  }
+
+  public ContainerHealthCheckException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 128a5d6..88585cd 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -177,6 +177,12 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.health.metrics.service.enabled" ;
   public static final boolean DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = false;
 
+  //Config to enable/disable container "suicide" on health check failures. To be used in execution modes, where the exiting
+  // container can be replaced with another container e.g. Gobblin-on-Yarn mode.
+  public static final String CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.exitOnHealthCheckFailure";
+  public static final boolean DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = false;
+
+
   //Config to enable/disable reuse of existing Helix Cluster
   public static final String HELIX_CLUSTER_OVERWRITE_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.overwrite";
   public static final boolean DEFAULT_HELIX_CLUSTER_OVERWRITE = true;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 103f0cc..68f98a2 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -72,6 +72,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Service;
 import com.google.common.util.concurrent.ServiceManager;
@@ -80,15 +81,25 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Getter;
+import lombok.Setter;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.FileUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 /**
@@ -130,6 +141,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   private final Optional<ContainerMetrics> containerMetrics;
   private final List<Service> services = Lists.newArrayList();
   private final Path appWorkPath;
+  //An EventBus instance that can be accessed from any component running within the worker process. The individual components can
+  // use the EventBus stream to communicate back application level health check results to the
+  // GobblinTaskRunner.
+  private final EventBus containerHealthEventBus;
 
   @Getter
   private HelixManager jobHelixManager;
@@ -138,11 +153,16 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   private TaskStateModelFactory taskStateModelFactory;
   private boolean isTaskDriver;
   private boolean dedicatedTaskDriverCluster;
+  private boolean isContainerExitOnHealthCheckFailureEnabled;
+
   private Collection<StandardMetricsBridge.StandardMetrics> metricsCollection;
   @Getter
   private volatile boolean started = false;
   private volatile boolean stopInProgress = false;
   private volatile boolean isStopped = false;
+  @Getter
+  @Setter
+  private volatile boolean healthCheckFailed = false;
 
   protected final String taskRunnerId;
   protected final EventBus eventBus = new EventBus(GobblinTaskRunner.class.getSimpleName());
@@ -177,6 +197,23 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     // in the application configuration have to be extracted and set before initializing HelixManager.
     HelixUtils.setSystemProperties(config);
 
+    this.isContainerExitOnHealthCheckFailureEnabled = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED,
+        GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED);
+
+    if (this.isContainerExitOnHealthCheckFailureEnabled) {
+      EventBus eventBus;
+      try {
+        eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+            SharedResourcesBrokerFactory.getImplicitBroker());
+      } catch (IOException e) {
+        logger.error("Could not find EventBus instance for container health check", e);
+        eventBus = null;
+      }
+      this.containerHealthEventBus = eventBus;
+    } else {
+      this.containerHealthEventBus = null;
+    }
+
     initHelixManager();
 
     this.containerMetrics = buildContainerMetrics();
@@ -276,7 +313,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   /**
    * Start this {@link GobblinTaskRunner} instance.
    */
-  public void start() {
+  public void start() throws ContainerHealthCheckException {
     logger.info(String.format("Starting %s in container %s", this.helixInstanceName, this.taskRunnerId));
 
     // Add a shutdown hook so the task scheduler gets properly shutdown
@@ -314,6 +351,12 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
               this.taskRunnerId);
     }
 
+    if (this.containerHealthEventBus != null) {
+      //Register itself with the container health event bus instance to receive container health events
+      logger.info("Registering GobblinTaskRunner with ContainerHealthCheckEventBus..");
+      this.containerHealthEventBus.register(this);
+    }
+
     if (this.serviceManager != null) {
       this.serviceManager.startAsync();
       started = true;
@@ -321,6 +364,13 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     } else {
       started = true;
     }
+
+    //Check if the TaskRunner shutdown is invoked due to a health check failure. If yes, throw a RuntimeException
+    // that will be propagated to the caller.
+    if (this.isContainerExitOnHealthCheckFailureEnabled && GobblinTaskRunner.this.isHealthCheckFailed()) {
+      logger.error("GobblinTaskRunner finished due to health check failure.");
+      throw new ContainerHealthCheckException();
+    }
   }
 
   public synchronized void stop() {
@@ -670,6 +720,27 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     }
   }
 
+  @Subscribe
+  public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) {
+    logger.error("Received {} from: {}", event.getClass().getSimpleName(), event.getClassName());
+    logger.error("Submitting a ContainerHealthCheckFailureEvent..");
+    submitEvent(event);
+    logger.error("Stopping GobblinTaskRunner...");
+    GobblinTaskRunner.this.setHealthCheckFailed(true);
+    GobblinTaskRunner.this.stop();
+  }
+
+  private void submitEvent(ContainerHealthCheckFailureEvent event) {
+    EventSubmitter eventSubmitter = new EventSubmitter.Builder(RootMetricContext.get(), getClass().getPackage().getName()).build();
+    GobblinEventBuilder eventBuilder = new GobblinEventBuilder(event.getClass().getSimpleName());
+    State taskState = ConfigUtils.configToState(event.getConfig());
+    //Add task metadata such as Helix taskId, containerId, and workflowId if configured
+    TaskEventMetadataGenerator taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator(taskState);
+    eventBuilder.addAdditionalMetadata(taskEventMetadataGenerator.getMetadata(taskState, event.getClass().getSimpleName()));
+    eventBuilder.addAdditionalMetadata(event.getMetadata());
+    eventSubmitter.submit(eventBuilder);
+  }
+
   private static String getApplicationId() {
     return "1";
   }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
index 2102365..9aa1eeb 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
@@ -66,8 +66,6 @@ public class HelixAssignedParticipantCheck implements CommitStep {
   private final int partitionNum;
   private final Config config;
 
-  private boolean isCompleted;
-
   /**
    * A method that uses the Singleton pattern to instantiate a {@link HelixManager} instance.
    * @param config
@@ -114,7 +112,7 @@ public class HelixAssignedParticipantCheck implements CommitStep {
    */
   @Override
   public boolean isCompleted() {
-    return isCompleted;
+    return false;
   }
 
   /**
@@ -157,7 +155,6 @@ public class HelixAssignedParticipantCheck implements CommitStep {
       isParticipant = true;
     }
 
-    this.isCompleted = true;
     if (!isParticipant) {
       throw new CommitStepException(String.format("Helix instance %s not the assigned participant for partition %d",this.helixInstanceName, this.partitionNum));
     }
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 98d13df..7a0b879 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -19,8 +19,10 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,12 +40,16 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
+import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
 import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
 
 
 /**
@@ -68,9 +74,10 @@ public class GobblinTaskRunnerTest {
   private TestingServer testingZKServer;
 
   private GobblinTaskRunner gobblinTaskRunner;
+  private GobblinTaskRunner gobblinTaskRunnerHealthCheck;
+  private GobblinTaskRunner corruptGobblinTaskRunner;
 
   private GobblinClusterManager gobblinClusterManager;
-  private GobblinTaskRunner corruptGobblinTaskRunner;
   private String clusterName;
   private String corruptHelixInstance;
   private TaskAssignmentAfterConnectionRetry suite;
@@ -101,7 +108,14 @@ public class GobblinTaskRunnerTest {
     this.gobblinTaskRunner =
         new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME,
             TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent());
-    this.gobblinTaskRunner.connectHelixManager();
+
+    // Participant
+    String healthCheckInstance = HelixUtils.getHelixInstanceName("HealthCheckHelixInstance", 0);
+    this.gobblinTaskRunnerHealthCheck =
+        new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, healthCheckInstance,
+            TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID,
+            config.withValue(GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED, ConfigValueFactory.fromAnyRef(true))
+        , Optional.<Path>absent());
 
     // Participant with a partial Instance set up on Helix/ZK
     this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
@@ -118,6 +132,8 @@ public class GobblinTaskRunnerTest {
 
   @Test
   public void testSendReceiveShutdownMessage() throws Exception {
+    this.gobblinTaskRunner.connectHelixManager();
+
     ExecutorService service = Executors.newSingleThreadExecutor();
     service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunner.start());
 
@@ -148,7 +164,6 @@ public class GobblinTaskRunnerTest {
     Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
   }
 
-
   @Test
   public void testConnectHelixManagerWithRetry() {
     HelixManager instanceManager = HelixManagerFactory.getZKHelixManager(
@@ -196,6 +211,38 @@ public class GobblinTaskRunnerTest {
     helixManager.disconnect();
   }
 
+  @Test (groups = {"disabledOnTravis"}, dependsOnMethods = "testSendReceiveShutdownMessage", expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*ContainerHealthCheckException.*")
+  public void testShutdownOnHealthCheckFailure() throws Exception {
+    this.gobblinTaskRunnerHealthCheck.connectHelixManager();
+
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    Future future = service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.start());
+
+    Logger log = LoggerFactory.getLogger("testHandleContainerHealthCheckFailure");
+
+    // Give Helix some time to start the task runner
+    AssertWithBackoff.create().logger(log).timeoutMs(20000)
+        .assertTrue(new Predicate<Void>() {
+          @Override public boolean apply(Void input) {
+            return GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.isStarted();
+          }
+        }, "gobblinTaskRunner started");
+
+    EventBus eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+        SharedResourcesBrokerFactory.getImplicitBroker());
+    eventBus.post(new ContainerHealthCheckFailureEvent(ConfigFactory.empty(), getClass().getName()));
+
+    // Give some time to allow GobblinTaskRunner to handle the ContainerHealthCheckFailureEvent
+    AssertWithBackoff.create().logger(log).timeoutMs(30000)
+        .assertTrue(new Predicate<Void>() {
+          @Override public boolean apply(Void input) {
+            return GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.isStopped();
+          }
+        }, "gobblinTaskRunner stopped");
+
+    //Call Future#get() to check and ensure that ContainerHealthCheckException is thrown
+    future.get();
+  }
 
   public static class TaskAssignmentAfterConnectionRetry extends IntegrationBasicSuite {
     TaskAssignmentAfterConnectionRetry(Config jobConfigOverrides) {
@@ -223,6 +270,7 @@ public class GobblinTaskRunnerTest {
       this.gobblinClusterManager.disconnectHelixManager();
       this.gobblinTaskRunner.disconnectHelixManager();
       this.corruptGobblinTaskRunner.disconnectHelixManager();
+      this.gobblinTaskRunnerHealthCheck.disconnectHelixManager();
       if (this.suite != null) {
         this.suite.shutdownCluster();
       }
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
index 1681d7b..f0914b6 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
@@ -58,9 +58,9 @@ public class KafkaKeyValueProducerPusherTest {
   @Test
   public void test() throws IOException {
     // Test that the scoped config overrides the generic config
-    Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC,
+    Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("127.0.0.1:dummy", TOPIC,
         Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
-            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
+            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort()))));
 
     String msg1 = "msg1";
     String msg2 = "msg2";
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
index 723f8b7..5531213 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
@@ -30,12 +30,12 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.typesafe.config.ConfigFactory;
 
+import kafka.consumer.ConsumerIterator;
+
 import org.apache.gobblin.kafka.KafkaTestBase;
 import org.apache.gobblin.metrics.kafka.KafkaProducerPusher;
 import org.apache.gobblin.metrics.kafka.Pusher;
 
-import kafka.consumer.ConsumerIterator;
-
 
 /**
  * Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}.
@@ -56,8 +56,8 @@ public class KafkaProducerPusherTest {
   @Test
   public void test() throws IOException {
     // Test that the scoped config overrides the generic config
-    Pusher pusher = new KafkaProducerPusher("localhost:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
-        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
+    Pusher pusher = new KafkaProducerPusher("127.0.0.1:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort()))));
 
     String msg1 = "msg1";
     String msg2 = "msg2";
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index c101d15..a54a75c 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
 import org.apache.gobblin.runtime.kafka.MockedHighLevelConsumer;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.test.TestUtils;
 import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
@@ -67,7 +68,7 @@ public class HighLevelConsumerTest extends KafkaTestBase {
   public HighLevelConsumerTest()
       throws InterruptedException, RuntimeException {
     super();
-    _kafkaBrokers = "localhost:" + this.getKafkaServerPort();
+    _kafkaBrokers = "127.0.0.1:" + this.getKafkaServerPort();
   }
 
   @BeforeSuite
@@ -92,7 +93,7 @@ public class HighLevelConsumerTest extends KafkaTestBase {
 
   public static Config getSimpleConfig(Optional<String> prefix) {
     Properties properties = new Properties();
-    properties.put(getConfigKey(prefix, ConfigurationKeys.KAFKA_BROKERS), "localhost:1234");
+    properties.put(getConfigKey(prefix, ConfigurationKeys.KAFKA_BROKERS), "127.0.0.1:" + TestUtils.findFreePort());
     properties.put(getConfigKey(prefix, Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY), Kafka09ConsumerClient.KAFKA_09_DEFAULT_KEY_DESERIALIZER);
     properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper");
     properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true");
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 103bfe0..77cca0c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -32,6 +32,7 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -94,6 +95,13 @@ public class KafkaExtractorStatsTracker {
   private List<KafkaPartition> partitions;
   private long maxPossibleLatency;
 
+  //Extractor stats aggregated across all partitions processed by the extractor.
+  @Getter (AccessLevel.PACKAGE)
+  @VisibleForTesting
+  private AggregateExtractorStats aggregateExtractorStats = new AggregateExtractorStats();
+  //Aggregate stats for the extractor derived from the most recently completed epoch
+  private AggregateExtractorStats lastAggregateExtractorStats;
+
   public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition> partitions) {
     this.workUnitState = state;
     this.partitions = partitions;
@@ -144,7 +152,7 @@ public class KafkaExtractorStatsTracker {
   }
 
   /**
-   * A Java POJO that encapsulates various extractor stats.
+   * A Java POJO that encapsulates per-partition extractor stats.
    */
   @Data
   public static class ExtractorStats {
@@ -166,6 +174,21 @@ public class KafkaExtractorStatsTracker {
   }
 
   /**
+   * A Java POJO to track the aggregate extractor stats across all partitions processed by the extractor.
+   */
+  @Data
+  public static class AggregateExtractorStats {
+    private long maxIngestionLatency;
+    private long numBytesConsumed;
+    private long minStartFetchEpochTime = Long.MAX_VALUE;
+    private long maxStopFetchEpochTime;
+    private long minLogAppendTime = Long.MAX_VALUE;
+    private long maxLogAppendTime;
+    private long slaMissedRecordCount;
+    private long processedRecordCount;
+  }
+
+  /**
    *
    * @param partitionIdx index of Kafka topic partition.
    * @return the number of undecodeable records for a given partition id.
@@ -292,6 +315,31 @@ public class KafkaExtractorStatsTracker {
       return v;
     });
     onPartitionReadComplete(partitionIdx, readStartTime);
+    updateAggregateExtractorStats(partitionIdx);
+  }
+
+  private void updateAggregateExtractorStats(int partitionIdx) {
+    ExtractorStats partitionStats = this.statsMap.get(this.partitions.get(partitionIdx));
+
+    if (partitionStats.getStartFetchEpochTime() < aggregateExtractorStats.getMinStartFetchEpochTime()) {
+      aggregateExtractorStats.setMinStartFetchEpochTime(partitionStats.getStartFetchEpochTime());
+    }
+    if (partitionStats.getStopFetchEpochTime() > aggregateExtractorStats.getMaxStopFetchEpochTime()) {
+      aggregateExtractorStats.setMaxStopFetchEpochTime(partitionStats.getStopFetchEpochTime());
+    }
+    long partitionLatency = partitionStats.getStopFetchEpochTime() - partitionStats.getMinLogAppendTime();
+    if (aggregateExtractorStats.getMaxIngestionLatency() < partitionLatency) {
+      aggregateExtractorStats.setMaxIngestionLatency(partitionLatency);
+    }
+    if (aggregateExtractorStats.getMinLogAppendTime() > partitionStats.getMinLogAppendTime()) {
+      aggregateExtractorStats.setMinLogAppendTime(partitionStats.getMinLogAppendTime());
+    }
+    if (aggregateExtractorStats.getMaxLogAppendTime() < partitionStats.getMaxLogAppendTime()) {
+      aggregateExtractorStats.setMaxLogAppendTime(partitionStats.getMaxLogAppendTime());
+    }
+    aggregateExtractorStats.setProcessedRecordCount(aggregateExtractorStats.getProcessedRecordCount() + partitionStats.getProcessedRecordCount());
+    aggregateExtractorStats.setNumBytesConsumed(aggregateExtractorStats.getNumBytesConsumed() + partitionStats.getPartitionTotalSize());
+    aggregateExtractorStats.setSlaMissedRecordCount(aggregateExtractorStats.getSlaMissedRecordCount() + partitionStats.getSlaMissedRecordCount());
   }
 
   private Map<String, String> createTagsForPartition(int partitionId, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark, MultiLongWatermark nextWatermark) {
@@ -451,9 +499,31 @@ public class KafkaExtractorStatsTracker {
   }
 
   /**
+   * @param timeUnit the time unit for the ingestion latency.
+   * @return the maximum ingestion latency across all partitions processed by the extractor from the last
+   * completed epoch.
+   */
+  public long getMaxIngestionLatency(TimeUnit timeUnit) {
+    return timeUnit.convert(this.lastAggregateExtractorStats.getMaxIngestionLatency(), TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   *
+   * @return the consumption rate in MB/s across all partitions processed by the extractor from the last
+   * completed epoch.
+   */
+  public double getConsumptionRateMBps() {
+    double consumptionDurationSecs = ((double) (this.lastAggregateExtractorStats.getMaxStopFetchEpochTime() - this.lastAggregateExtractorStats
+            .getMinStartFetchEpochTime())) / 1000;
+    return this.lastAggregateExtractorStats.getNumBytesConsumed() / (consumptionDurationSecs * (1024 * 1024L));
+  }
+
+  /**
    * Reset all KafkaExtractor stats.
    */
   public void reset() {
+    this.lastAggregateExtractorStats = this.aggregateExtractorStats;
+    this.aggregateExtractorStats = new AggregateExtractorStats();
     this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats()));
     for (int partitionIdx = 0; partitionIdx < this.partitions.size(); partitionIdx++) {
       resetStartFetchEpochTime(partitionIdx);
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
new file mode 100644
index 0000000..d7a8346
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.EvictingQueue;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
+
+
+@Slf4j
+@Alias(value = "KafkaIngestionHealthCheck")
+public class KafkaIngestionHealthCheck implements CommitStep {
+  public static final String KAFKA_INGESTION_HEALTH_CHECK_PREFIX = "gobblin.kafka.healthCheck.";
+  public static final String KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "slidingWindow.size";
+  public static final String KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "ingestionLatency.minutes";
+  public static final String KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "consumptionRate.dropOffFraction";
+  public static final String KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "expected.consumptionRateMbps";
+  public static final String KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "increasingLatencyCheckEnabled";
+
+  public static final int DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE = 3;
+  public static final long DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES= 15;
+  public static final double DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION = 0.7;
+  public static final double DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS = 10.0;
+  private static final boolean DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED = true;
+
+  private final Config config;
+  private final EventBus eventBus;
+  private final KafkaExtractorStatsTracker statsTracker;
+  private final double expectedConsumptionRate;
+  private final double consumptionRateDropOffFraction;
+  private final long ingestionLatencyThresholdMinutes;
+  private final int slidingWindowSize;
+  private final EvictingQueue<Long> ingestionLatencies;
+  private final EvictingQueue<Double> consumptionRateMBps;
+  private final boolean increasingLatencyCheckEnabled;
+
+  public KafkaIngestionHealthCheck(Config config, KafkaExtractorStatsTracker statsTracker) {
+    this.config = config;
+    this.slidingWindowSize = ConfigUtils.getInt(config, KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE);
+    this.ingestionLatencyThresholdMinutes = ConfigUtils.getLong(config, KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES);
+    this.consumptionRateDropOffFraction = ConfigUtils.getDouble(config, KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION);
+    this.expectedConsumptionRate = ConfigUtils.getDouble(config, KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS);
+    this.increasingLatencyCheckEnabled = ConfigUtils.getBoolean(config, KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED);
+    this.ingestionLatencies = EvictingQueue.create(this.slidingWindowSize);
+    this.consumptionRateMBps = EvictingQueue.create(this.slidingWindowSize);
+    EventBus eventBus;
+    try {
+      eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+          SharedResourcesBrokerFactory.getImplicitBroker());
+    } catch (IOException e) {
+      log.error("Could not find EventBus instance for container health check", e);
+      eventBus = null;
+    }
+    this.eventBus = eventBus;
+    this.statsTracker = statsTracker;
+  }
+
+  /**
+   *
+   * @return true if (i) ingestionLatency in the each of the recent epochs exceeds the threshold latency , AND (ii)
+   * if {@link KafkaIngestionHealthCheck#increasingLatencyCheckEnabled} is true, the latency
+   * is increasing over these epochs.
+   */
+  private boolean checkIngestionLatency() {
+    Long previousLatency = -1L;
+    for (Long ingestionLatency: ingestionLatencies) {
+      if (ingestionLatency < this.ingestionLatencyThresholdMinutes) {
+        return false;
+      } else {
+        if (this.increasingLatencyCheckEnabled) {
+          if (previousLatency >= ingestionLatency) {
+            return false;
+          }
+          previousLatency = ingestionLatency;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Determine whether the commit step has been completed.
+   */
+  @Override
+  public boolean isCompleted()
+      throws IOException {
+    return false;
+  }
+
+  /**
+   * @return Return a serialized string representation of health check report.
+   */
+  private String getHealthCheckReport() {
+    return String.format("Ingestion Latencies = %s, Ingestion Latency Threshold = %s minutes, "
+        + "Consumption Rates = %s, Target Consumption Rate = %s MBps", this.ingestionLatencies.toString(),
+        this.ingestionLatencyThresholdMinutes, this.consumptionRateMBps.toString(), this.expectedConsumptionRate);
+  }
+
+  /**
+   * Execute the commit step. The execute method gets the maximum ingestion latency and the consumption rate and emits
+   * a {@link ContainerHealthCheckFailureEvent} if the following conditions are satisfied:
+   * <li>
+   *   <ul>The ingestion latency increases monotonically over the {@link KafkaIngestionHealthCheck#slidingWindowSize} intervals, AND </ul>
+   *   <ul>The maximum consumption rate over the {@link KafkaIngestionHealthCheck#slidingWindowSize} intervals is smaller than
+   *   {@link KafkaIngestionHealthCheck#consumptionRateDropOffFraction} * {@link KafkaIngestionHealthCheck#expectedConsumptionRate}</ul>.
+   * </li>
+   *
+   * The {@link ContainerHealthCheckFailureEvent} is posted to a global event bus. The handlers of this event type
+   * can perform suitable actions based on the execution environment.
+   */
+  @Override
+  public void execute() {
+    this.ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.MINUTES));
+    this.consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps());
+    if (ingestionLatencies.size() < this.slidingWindowSize) {
+      log.info("SUCCESS: Num observations: {} smaller than {}", ingestionLatencies.size(), this.slidingWindowSize);
+      return;
+    }
+
+    if (!checkIngestionLatency()) {
+      log.info("SUCCESS: Ingestion Latencies = {}, Ingestion Latency Threshold: {}", this.ingestionLatencies.toString(), this.ingestionLatencyThresholdMinutes);
+      return;
+    }
+
+    double avgConsumptionRate = getMaxConsumptionRate();
+    if (avgConsumptionRate > this.consumptionRateDropOffFraction * this.expectedConsumptionRate) {
+      log.info("SUCCESS: Avg. Consumption Rate = {} MBps, Target Consumption rate = {} MBps", avgConsumptionRate, this.expectedConsumptionRate);
+      return;
+    }
+
+    log.error("FAILED: {}", getHealthCheckReport());
+
+    if (this.eventBus != null) {
+      log.info("Posting {} message to EventBus", ContainerHealthCheckFailureEvent.class.getSimpleName());
+      ContainerHealthCheckFailureEvent event = new ContainerHealthCheckFailureEvent(this.config, getClass().getName());
+      event.addMetadata("ingestionLatencies", this.ingestionLatencies.toString());
+      event.addMetadata("consumptionRates", this.consumptionRateMBps.toString());
+      event.addMetadata("ingestionLatencyThreshold", Long.toString(this.ingestionLatencyThresholdMinutes));
+      event.addMetadata("targetConsumptionRate", Double.toString(this.expectedConsumptionRate));
+      this.eventBus.post(event);
+    }
+  }
+
+  private double getMaxConsumptionRate() {
+    return consumptionRateMBps.stream().mapToDouble(consumptionRate -> consumptionRate)
+        .filter(consumptionRate -> consumptionRate >= 0.0).max().orElse(0.0);
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 1f959f2..67e1146 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.HdrHistogram.Histogram;
 import org.HdrHistogram.HistogramLogReader;
@@ -40,6 +41,7 @@ public class KafkaExtractorStatsTrackerTest {
   private WorkUnitState workUnitState;
   final static KafkaPartition PARTITION0 =  new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build();
   final static KafkaPartition PARTITION1 =  new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build();
+  private long epochDurationMs;
 
   @BeforeClass
   public void setUp() {
@@ -80,12 +82,13 @@ public class KafkaExtractorStatsTrackerTest {
 
   @Test
   public void testOnDecodeableRecord() throws InterruptedException {
+    this.extractorStatsTracker.reset();
     long readStartTime = System.nanoTime();
     Thread.sleep(1);
     long decodeStartTime = System.nanoTime();
     long currentTimeMillis = System.currentTimeMillis();
-    long logAppendTimestamp = currentTimeMillis - 15 * 60 * 1000L;
-    long recordCreationTimestamp = currentTimeMillis - 16 * 60 * 1000L;
+    long logAppendTimestamp = currentTimeMillis - TimeUnit.MINUTES.toMillis(15);
+    long recordCreationTimestamp = currentTimeMillis - TimeUnit.MINUTES.toMillis(16);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 0);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 0);
     Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() == 0);
@@ -172,6 +175,20 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMinLogAppendTime(), logAppendTimestamp);
     Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime(), logAppendTimestamp);
     Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 3);
+
+    long startFetchEpochTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getStartFetchEpochTime();
+    long stopFetchEpochTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getStopFetchEpochTime();
+    this.epochDurationMs = stopFetchEpochTime - startFetchEpochTime;
+    long minLogAppendTimestamp = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime();
+    long maxLogAppendTimestamp = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime();
+    //Ensure aggregate extractor stats have been updated correctly for the completed epoch
+    Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMinStartFetchEpochTime(), startFetchEpochTime);
+    Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMaxStopFetchEpochTime(), stopFetchEpochTime);
+    Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMinLogAppendTime(), minLogAppendTimestamp);
+    Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMaxLogAppendTime(), maxLogAppendTimestamp);
+    Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getNumBytesConsumed(), 300L);
+    Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getProcessedRecordCount(), 3L);
+    Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getSlaMissedRecordCount(), 1);
   }
 
   @Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")
@@ -189,6 +206,17 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150);
   }
 
+  @Test (dependsOnMethods = "testGetAvgRecordSize")
+  public void testGetMaxLatency() {
+    Assert.assertTrue(this.extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES) >= 15);
+  }
+
+  @Test (dependsOnMethods = "testGetAvgRecordSize")
+  public void testGetConsumptionRateMBps() {
+    double a = this.extractorStatsTracker.getConsumptionRateMBps();
+    Assert.assertEquals((new Double(Math.ceil(a * epochDurationMs * 1024 * 1024) / 1000)).longValue(), 300L);
+  }
+
   @Test
   public void testGenerateTagsForPartitions() throws Exception {
     MultiLongWatermark lowWatermark = new MultiLongWatermark(Arrays.asList(new Long(10), new Long(20)));
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
new file mode 100644
index 0000000..ff7049f
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
+
+@Test (singleThreaded = true)
+public class KafkaIngestionHealthCheckTest {
+  private EventBus eventBus;
+  private CountDownLatch countDownLatch;
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    this.eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+        SharedResourcesBrokerFactory.getImplicitBroker());
+    this.eventBus.register(this);
+  }
+
+  @Subscribe
+  public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) {
+    this.countDownLatch.countDown();
+  }
+
+  @Test
+  public void testExecuteIncreasingLatencyCheckEnabled()
+      throws InterruptedException {
+    this.countDownLatch = new CountDownLatch(1);
+    Config config = ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY,
+        ConfigValueFactory.fromAnyRef(5))
+        .withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, ConfigValueFactory.fromAnyRef(5));
+
+    KafkaExtractorStatsTracker extractorStatsTracker = Mockito.mock(KafkaExtractorStatsTracker.class);
+    Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES))
+        .thenReturn(6L)
+        .thenReturn(7L)
+        .thenReturn(10L)
+        .thenReturn(5L);
+    Mockito.when(extractorStatsTracker.getConsumptionRateMBps())
+        .thenReturn(2.0)
+        .thenReturn(1.5)
+        .thenReturn(2.1)
+        .thenReturn(2.5);
+
+    KafkaIngestionHealthCheck check = new KafkaIngestionHealthCheck(config, extractorStatsTracker);
+
+    //Latency increases continuously for the first 3 calls to execute().
+    check.execute();
+    this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(this.countDownLatch.getCount(), 1L);
+    check.execute();
+    this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(this.countDownLatch.getCount(), 1L);
+    check.execute();
+    //Ensure that ContainerHealthCheckFailureEvent is posted to eventBus; countDownLatch should be back to 0.
+    this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(this.countDownLatch.getCount(), 0);
+
+    //Set the countdown latch back to 1.
+    this.countDownLatch = new CountDownLatch(1);
+    //Latency decreases from 10 to 5. So check.execute() should not post any event to EventBus.
+    check.execute();
+    this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(this.countDownLatch.getCount(), 1);
+
+    config = config.withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(false));
+    extractorStatsTracker = Mockito.mock(KafkaExtractorStatsTracker.class);
+    Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES))
+        .thenReturn(10L)
+        .thenReturn(7L)
+        .thenReturn(5L);
+    Mockito.when(extractorStatsTracker.getConsumptionRateMBps())
+        .thenReturn(2.0)
+        .thenReturn(1.5)
+        .thenReturn(2.1);
+
+    check = new KafkaIngestionHealthCheck(config, extractorStatsTracker);
+
+    check.execute();
+  }
+
+  @Test
+  public void testExecuteIncreasingLatencyCheckDisabled()
+      throws InterruptedException {
+    this.countDownLatch = new CountDownLatch(1);
+
+    Config config = ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY,
+        ConfigValueFactory.fromAnyRef(5))
+        .withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, ConfigValueFactory.fromAnyRef(5))
+        .withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(false));
+
+    KafkaExtractorStatsTracker extractorStatsTracker = Mockito.mock(KafkaExtractorStatsTracker.class);
+    Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES))
+        .thenReturn(10L)
+        .thenReturn(7L)
+        .thenReturn(6L)
+        .thenReturn(4L);
+    Mockito.when(extractorStatsTracker.getConsumptionRateMBps())
+        .thenReturn(2.0)
+        .thenReturn(1.5)
+        .thenReturn(2.1)
+        .thenReturn(2.5);
+
+    KafkaIngestionHealthCheck check = new KafkaIngestionHealthCheck(config, extractorStatsTracker);
+
+    //Latency consistently above 5 minutes for the first 3 calls to execute().
+    check.execute();
+    this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(this.countDownLatch.getCount(), 1L);
+    check.execute();
+    this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(this.countDownLatch.getCount(), 1L);
+    check.execute();
+    //Ensure that ContainerHealthCheckFailureEvent is posted to eventBus; countDownLatch should be back to 0.
+    this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(this.countDownLatch.getCount(), 0);
+
+    //Set the countdown latch back to 1.
+    this.countDownLatch = new CountDownLatch(1);
+    //Latency decreases to 4. So check.execute() should not post any event to EventBus.
+    check.execute();
+    this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(this.countDownLatch.getCount(), 1);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
similarity index 97%
rename from gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
rename to gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index ded0d5c..282ede6 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -25,9 +25,7 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
-
-import org.apache.gobblin.restli.EmbeddedRestliServer;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
 import org.eclipse.jgit.api.Git;
@@ -52,12 +50,13 @@ import com.linkedin.restli.client.RestLiResponseException;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.kafka.KafkaTestBase;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.core.GitConfigMonitor;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
@@ -93,6 +92,7 @@ public class GobblinServiceManagerTest {
   private FlowConfigClient flowConfigClient;
 
   private Git gitForPush;
+  private TestingServer testingServer;
 
   @BeforeClass
   public void setup() throws Exception {
@@ -100,14 +100,12 @@ public class GobblinServiceManagerTest {
     cleanUpDir(SPEC_STORE_PARENT_DIR);
     ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
-    KafkaTestBase kafkaTestHelper = new KafkaTestBase();
-    kafkaTestHelper.startServers();
-
+    testingServer = new TestingServer(true);
     Properties serviceCoreProperties = new Properties();
     serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
     serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
     serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
-    serviceCoreProperties.put("zookeeper.connect", kafkaTestHelper.getZkConnectString());
+    serviceCoreProperties.put("zookeeper.connect", testingServer.getConnectString());
     serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
 
     serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR);
@@ -178,6 +176,11 @@ public class GobblinServiceManagerTest {
     } catch (Exception e) {
       logger.warn("Could not completely cleanup Spec Store Parent Dir");
     }
+    try {
+      this.testingServer.close();
+    } catch(Exception e) {
+      System.err.println("Failed to close ZK testing server.");
+    }
   }
 
   @Test
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
new file mode 100644
index 0000000..e10e675
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * An event type to signal failure of a container health check. This event can be generated from anywhere
+ * inside the application. This event is intended to be emitted
+ * over an {@link com.google.common.eventbus.EventBus} instance.
+ */
+package org.apache.gobblin.util.event;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+public class ContainerHealthCheckFailureEvent {
+  public static final String CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME = "ContainerHealthCheckEventBus";
+
+  @Getter
+  private final Config config;
+
+  /**
+   * Name of the class that generated this failure event.
+   */
+  @Getter
+  private final String className;
+
+  @Getter
+  private final Map<String, String> metadata = Maps.newHashMap();
+
+  public ContainerHealthCheckFailureEvent(Config config, String className) {
+    this.config = config;
+    this.className = className;
+  }
+
+  public void addMetadata(String key, String value) {
+    metadata.put(key, value);
+  }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusFactory.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusFactory.java
new file mode 100644
index 0000000..4c4fdbd
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.eventbus;
+
+import java.io.IOException;
+
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopeType;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * A {@link SharedResourceFactory} for creating {@link EventBus} instances.
+ * @param <S>
+ */
+public class EventBusFactory<S extends ScopeType<S>> implements SharedResourceFactory<EventBus, EventBusKey, S> {
+  public static final String FACTORY_NAME = "eventbus";
+
+  @Override
+  public String getName() {
+    return FACTORY_NAME;
+  }
+
+  public static <S extends ScopeType<S>> EventBus get(String eventBusName, SharedResourcesBroker<S> broker)
+      throws IOException {
+    try {
+      return broker.getSharedResource(new EventBusFactory<S>(), new EventBusKey(eventBusName));
+    } catch (NotConfiguredException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public SharedResourceFactoryResponse<EventBus> createResource(SharedResourcesBroker<S> broker,
+      ScopedConfigView<S, EventBusKey> config) {
+    EventBusKey eventBusKey = config.getKey();
+    EventBus eventBus = new EventBus(eventBusKey.getSourceClassName());
+    return new ResourceInstance<>(eventBus);
+  }
+
+  @Override
+  public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, EventBusKey> config) {
+    return broker.selfScope().getType().rootScope();
+  }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusKey.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusKey.java
new file mode 100644
index 0000000..7f4c459
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusKey.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.eventbus;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.broker.iface.SharedResourceKey;
+
+@EqualsAndHashCode
+@Getter
+public class EventBusKey implements SharedResourceKey{
+  private final String sourceClassName;
+
+  public EventBusKey(String sourceClassName) {
+    this.sourceClassName = sourceClassName;
+  }
+
+  /**
+   * @return A serialization of the {@link SharedResourceKey} into a short, sanitized string. Users configure a
+   *         shared resource using the value of this method.
+   */
+  @Override
+  public String toConfigurationKey() {
+    return this.sourceClassName;
+  }
+}
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/eventbus/EventBusFactoryTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/eventbus/EventBusFactoryTest.java
new file mode 100644
index 0000000..18cd5a1
--- /dev/null
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/eventbus/EventBusFactoryTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.eventbus;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.SharedResourcesBrokerImpl;
+import org.apache.gobblin.broker.SimpleScope;
+import org.apache.gobblin.broker.SimpleScopeType;
+import org.apache.gobblin.broker.iface.NoSuchScopeException;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+public class EventBusFactoryTest {
+
+  @Test
+  public void testGet()
+      throws NotConfiguredException, IOException, NoSuchScopeException {
+    SharedResourcesBrokerImpl<SimpleScopeType> broker = SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(
+        ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+    EventBus eventBus1 = EventBusFactory.get(getClass().getSimpleName(), broker);
+    EventBus eventBus2 = EventBusFactory.get(getClass().getSimpleName(), broker);
+
+    //Should return the same eventbus instance
+    Assert.assertEquals(eventBus1, eventBus2);
+
+    SharedResourcesBroker<SimpleScopeType> subBroker =
+        broker.newSubscopedBuilder(new SimpleScope<>(SimpleScopeType.LOCAL, "local")).build();
+    EventBus eventBus3 = EventBusFactory.get(getClass().getSimpleName(), subBroker);
+    //Should return the same eventbus instance
+    Assert.assertEquals(eventBus1, eventBus3);
+
+    //Create a new eventbus with local scope
+    EventBus eventBus4 = subBroker.getSharedResourceAtScope(new EventBusFactory<>(), new EventBusKey(getClass().getSimpleName()), SimpleScopeType.LOCAL);
+    Assert.assertNotEquals(eventBus3, eventBus4);
+
+    //Create an eventbus with different source class name
+    EventBus eventBus5 = EventBusFactory.get("", broker);
+    Assert.assertNotEquals(eventBus1, eventBus5);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index f79a5bb..1abf8d2 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,7 +25,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.gobblin.util.logs.LogCopier;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -46,11 +45,13 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import org.apache.gobblin.cluster.ContainerHealthCheckException;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.cluster.GobblinTaskRunner;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
 import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
 
 
@@ -209,6 +210,12 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
     } catch (ParseException pe) {
       printUsage(options);
       System.exit(1);
+    } catch (ContainerHealthCheckException e) {
+      // Ideally, we should not be catching this exception, as this is indicative of a non-recoverable exception. However,
+      // simply propagating the exception may prevent the container exit due to the presence of non-daemon threads present
+      // in the application. Hence, we catch this exception to invoke System.exit() which in turn ensures that all non-daemon threads are killed.
+      LOGGER.error("Exception encountered: {}", e);
+      System.exit(1);
     }
   }
 }
\ No newline at end of file
diff --git a/travis/test-groups.inc b/travis/test-groups.inc
index af6e517..8eef534 100644
--- a/travis/test-groups.inc
+++ b/travis/test-groups.inc
@@ -1 +1 @@
-TEST_GROUP1=gobbin.yarn,gobblin.runtime,gobblin.cluster,gobblin.compaction
+TEST_GROUP1=gobbin.yarn,gobblin.runtime,gobblin.cluster,gobblin.compaction,gobblin.util,gobblin.writer