You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/01 18:29:21 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1162] Provide an option to allow slow containers to commit su…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5cd38ac [GOBBLIN-1162] Provide an option to allow slow containers to commit su…
5cd38ac is described below
commit 5cd38ac8d26dd4bccd8582a7182893f29ab5a009
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Jun 1 11:29:14 2020 -0700
[GOBBLIN-1162] Provide an option to allow slow containers to commit su…
Closes #3002 from sv2000/containerSuicide
---
.../cluster/ContainerHealthCheckException.java | 36 +++++
.../cluster/GobblinClusterConfigurationKeys.java | 6 +
.../apache/gobblin/cluster/GobblinTaskRunner.java | 73 ++++++++-
.../cluster/HelixAssignedParticipantCheck.java | 5 +-
.../gobblin/cluster/GobblinTaskRunnerTest.java | 54 ++++++-
.../reporter/KafkaKeyValueProducerPusherTest.java | 4 +-
.../metrics/reporter/KafkaProducerPusherTest.java | 8 +-
.../gobblin/runtime/HighLevelConsumerTest.java | 5 +-
.../extract/kafka/KafkaExtractorStatsTracker.java | 72 ++++++++-
.../extract/kafka/KafkaIngestionHealthCheck.java | 174 +++++++++++++++++++++
.../kafka/KafkaExtractorStatsTrackerTest.java | 32 +++-
.../kafka/KafkaIngestionHealthCheckTest.java | 155 ++++++++++++++++++
.../gobblin/service/GobblinServiceManagerTest.java | 19 ++-
.../event/ContainerHealthCheckFailureEvent.java | 55 +++++++
.../gobblin/util/eventbus/EventBusFactory.java | 67 ++++++++
.../apache/gobblin/util/eventbus/EventBusKey.java | 42 +++++
.../gobblin/util/eventbus/EventBusFactoryTest.java | 65 ++++++++
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 9 +-
travis/test-groups.inc | 2 +-
19 files changed, 854 insertions(+), 29 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthCheckException.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthCheckException.java
new file mode 100644
index 0000000..443bde6
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthCheckException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+/**
+ * Signals that the container has failed one or more health checks. In other words, the container has been detected
+ * itself to be in an unhealthy state. The application may want to catch this exception to take an appropriate
+ * action e.g. exiting with an appropriate exit code.
+ */
+public class ContainerHealthCheckException extends RuntimeException {
+ public ContainerHealthCheckException() {
+ super();
+ }
+
+ public ContainerHealthCheckException(String message) {
+ super(message);
+ }
+
+ public ContainerHealthCheckException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 128a5d6..88585cd 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -177,6 +177,12 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.health.metrics.service.enabled" ;
public static final boolean DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = false;
+ //Config to enable/disable container "suicide" on health check failures. To be used in execution modes, where the exiting
+ // container can be replaced with another container e.g. Gobblin-on-Yarn mode.
+ public static final String CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.exitOnHealthCheckFailure";
+ public static final boolean DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = false;
+
+
//Config to enable/disable reuse of existing Helix Cluster
public static final String HELIX_CLUSTER_OVERWRITE_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.overwrite";
public static final boolean DEFAULT_HELIX_CLUSTER_OVERWRITE = true;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 103f0cc..68f98a2 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -72,6 +72,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
@@ -80,15 +81,25 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
+import lombok.Setter;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.FileUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
@@ -130,6 +141,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
private final Optional<ContainerMetrics> containerMetrics;
private final List<Service> services = Lists.newArrayList();
private final Path appWorkPath;
+ //An EventBus instance that can be accessed from any component running within the worker process. The individual components can
+ // use the EventBus stream to communicate back application level health check results to the
+ // GobblinTaskRunner.
+ private final EventBus containerHealthEventBus;
@Getter
private HelixManager jobHelixManager;
@@ -138,11 +153,16 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
private TaskStateModelFactory taskStateModelFactory;
private boolean isTaskDriver;
private boolean dedicatedTaskDriverCluster;
+ private boolean isContainerExitOnHealthCheckFailureEnabled;
+
private Collection<StandardMetricsBridge.StandardMetrics> metricsCollection;
@Getter
private volatile boolean started = false;
private volatile boolean stopInProgress = false;
private volatile boolean isStopped = false;
+ @Getter
+ @Setter
+ private volatile boolean healthCheckFailed = false;
protected final String taskRunnerId;
protected final EventBus eventBus = new EventBus(GobblinTaskRunner.class.getSimpleName());
@@ -177,6 +197,23 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
// in the application configuration have to be extracted and set before initializing HelixManager.
HelixUtils.setSystemProperties(config);
+ this.isContainerExitOnHealthCheckFailureEnabled = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED,
+ GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED);
+
+ if (this.isContainerExitOnHealthCheckFailureEnabled) {
+ EventBus eventBus;
+ try {
+ eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+ } catch (IOException e) {
+ logger.error("Could not find EventBus instance for container health check", e);
+ eventBus = null;
+ }
+ this.containerHealthEventBus = eventBus;
+ } else {
+ this.containerHealthEventBus = null;
+ }
+
initHelixManager();
this.containerMetrics = buildContainerMetrics();
@@ -276,7 +313,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
/**
* Start this {@link GobblinTaskRunner} instance.
*/
- public void start() {
+ public void start() throws ContainerHealthCheckException {
logger.info(String.format("Starting %s in container %s", this.helixInstanceName, this.taskRunnerId));
// Add a shutdown hook so the task scheduler gets properly shutdown
@@ -314,6 +351,12 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
this.taskRunnerId);
}
+ if (this.containerHealthEventBus != null) {
+ //Register itself with the container health event bus instance to receive container health events
+ logger.info("Registering GobblinTaskRunner with ContainerHealthCheckEventBus..");
+ this.containerHealthEventBus.register(this);
+ }
+
if (this.serviceManager != null) {
this.serviceManager.startAsync();
started = true;
@@ -321,6 +364,13 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
} else {
started = true;
}
+
+ //Check if the TaskRunner shutdown is invoked due to a health check failure. If yes, throw a RuntimeException
+ // that will be propagated to the caller.
+ if (this.isContainerExitOnHealthCheckFailureEnabled && GobblinTaskRunner.this.isHealthCheckFailed()) {
+ logger.error("GobblinTaskRunner finished due to health check failure.");
+ throw new ContainerHealthCheckException();
+ }
}
public synchronized void stop() {
@@ -670,6 +720,27 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
}
+ @Subscribe
+ public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) {
+ logger.error("Received {} from: {}", event.getClass().getSimpleName(), event.getClassName());
+ logger.error("Submitting a ContainerHealthCheckFailureEvent..");
+ submitEvent(event);
+ logger.error("Stopping GobblinTaskRunner...");
+ GobblinTaskRunner.this.setHealthCheckFailed(true);
+ GobblinTaskRunner.this.stop();
+ }
+
+ private void submitEvent(ContainerHealthCheckFailureEvent event) {
+ EventSubmitter eventSubmitter = new EventSubmitter.Builder(RootMetricContext.get(), getClass().getPackage().getName()).build();
+ GobblinEventBuilder eventBuilder = new GobblinEventBuilder(event.getClass().getSimpleName());
+ State taskState = ConfigUtils.configToState(event.getConfig());
+ //Add task metadata such as Helix taskId, containerId, and workflowId if configured
+ TaskEventMetadataGenerator taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator(taskState);
+ eventBuilder.addAdditionalMetadata(taskEventMetadataGenerator.getMetadata(taskState, event.getClass().getSimpleName()));
+ eventBuilder.addAdditionalMetadata(event.getMetadata());
+ eventSubmitter.submit(eventBuilder);
+ }
+
private static String getApplicationId() {
return "1";
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
index 2102365..9aa1eeb 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
@@ -66,8 +66,6 @@ public class HelixAssignedParticipantCheck implements CommitStep {
private final int partitionNum;
private final Config config;
- private boolean isCompleted;
-
/**
* A method that uses the Singleton pattern to instantiate a {@link HelixManager} instance.
* @param config
@@ -114,7 +112,7 @@ public class HelixAssignedParticipantCheck implements CommitStep {
*/
@Override
public boolean isCompleted() {
- return isCompleted;
+ return false;
}
/**
@@ -157,7 +155,6 @@ public class HelixAssignedParticipantCheck implements CommitStep {
isParticipant = true;
}
- this.isCompleted = true;
if (!isParticipant) {
throw new CommitStepException(String.format("Helix instance %s not the assigned participant for partition %d",this.helixInstanceName, this.partitionNum));
}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 98d13df..7a0b879 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -19,8 +19,10 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.net.URL;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.FileSystem;
@@ -38,12 +40,16 @@ import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
+import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
/**
@@ -68,9 +74,10 @@ public class GobblinTaskRunnerTest {
private TestingServer testingZKServer;
private GobblinTaskRunner gobblinTaskRunner;
+ private GobblinTaskRunner gobblinTaskRunnerHealthCheck;
+ private GobblinTaskRunner corruptGobblinTaskRunner;
private GobblinClusterManager gobblinClusterManager;
- private GobblinTaskRunner corruptGobblinTaskRunner;
private String clusterName;
private String corruptHelixInstance;
private TaskAssignmentAfterConnectionRetry suite;
@@ -101,7 +108,14 @@ public class GobblinTaskRunnerTest {
this.gobblinTaskRunner =
new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME,
TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent());
- this.gobblinTaskRunner.connectHelixManager();
+
+ // Participant
+ String healthCheckInstance = HelixUtils.getHelixInstanceName("HealthCheckHelixInstance", 0);
+ this.gobblinTaskRunnerHealthCheck =
+ new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, healthCheckInstance,
+ TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID,
+ config.withValue(GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED, ConfigValueFactory.fromAnyRef(true))
+ , Optional.<Path>absent());
// Participant with a partial Instance set up on Helix/ZK
this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
@@ -118,6 +132,8 @@ public class GobblinTaskRunnerTest {
@Test
public void testSendReceiveShutdownMessage() throws Exception {
+ this.gobblinTaskRunner.connectHelixManager();
+
ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunner.start());
@@ -148,7 +164,6 @@ public class GobblinTaskRunnerTest {
Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
}
-
@Test
public void testConnectHelixManagerWithRetry() {
HelixManager instanceManager = HelixManagerFactory.getZKHelixManager(
@@ -196,6 +211,38 @@ public class GobblinTaskRunnerTest {
helixManager.disconnect();
}
+ @Test (groups = {"disabledOnTravis"}, dependsOnMethods = "testSendReceiveShutdownMessage", expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*ContainerHealthCheckException.*")
+ public void testShutdownOnHealthCheckFailure() throws Exception {
+ this.gobblinTaskRunnerHealthCheck.connectHelixManager();
+
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ Future future = service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.start());
+
+ Logger log = LoggerFactory.getLogger("testHandleContainerHealthCheckFailure");
+
+ // Give Helix some time to start the task runner
+ AssertWithBackoff.create().logger(log).timeoutMs(20000)
+ .assertTrue(new Predicate<Void>() {
+ @Override public boolean apply(Void input) {
+ return GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.isStarted();
+ }
+ }, "gobblinTaskRunner started");
+
+ EventBus eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+ eventBus.post(new ContainerHealthCheckFailureEvent(ConfigFactory.empty(), getClass().getName()));
+
+ // Give some time to allow GobblinTaskRunner to handle the ContainerHealthCheckFailureEvent
+ AssertWithBackoff.create().logger(log).timeoutMs(30000)
+ .assertTrue(new Predicate<Void>() {
+ @Override public boolean apply(Void input) {
+ return GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.isStopped();
+ }
+ }, "gobblinTaskRunner stopped");
+
+ //Call Future#get() to check and ensure that ContainerHealthCheckException is thrown
+ future.get();
+ }
public static class TaskAssignmentAfterConnectionRetry extends IntegrationBasicSuite {
TaskAssignmentAfterConnectionRetry(Config jobConfigOverrides) {
@@ -223,6 +270,7 @@ public class GobblinTaskRunnerTest {
this.gobblinClusterManager.disconnectHelixManager();
this.gobblinTaskRunner.disconnectHelixManager();
this.corruptGobblinTaskRunner.disconnectHelixManager();
+ this.gobblinTaskRunnerHealthCheck.disconnectHelixManager();
if (this.suite != null) {
this.suite.shutdownCluster();
}
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
index 1681d7b..f0914b6 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
@@ -58,9 +58,9 @@ public class KafkaKeyValueProducerPusherTest {
@Test
public void test() throws IOException {
// Test that the scoped config overrides the generic config
- Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC,
+ Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("127.0.0.1:dummy", TOPIC,
Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort()))));
String msg1 = "msg1";
String msg2 = "msg2";
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
index 723f8b7..5531213 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
@@ -30,12 +30,12 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
+import kafka.consumer.ConsumerIterator;
+
import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.metrics.kafka.KafkaProducerPusher;
import org.apache.gobblin.metrics.kafka.Pusher;
-import kafka.consumer.ConsumerIterator;
-
/**
* Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}.
@@ -56,8 +56,8 @@ public class KafkaProducerPusherTest {
@Test
public void test() throws IOException {
// Test that the scoped config overrides the generic config
- Pusher pusher = new KafkaProducerPusher("localhost:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
+ Pusher pusher = new KafkaProducerPusher("127.0.0.1:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort()))));
String msg1 = "msg1";
String msg2 = "msg2";
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index c101d15..a54a75c 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.kafka.MockedHighLevelConsumer;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.test.TestUtils;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
@@ -67,7 +68,7 @@ public class HighLevelConsumerTest extends KafkaTestBase {
public HighLevelConsumerTest()
throws InterruptedException, RuntimeException {
super();
- _kafkaBrokers = "localhost:" + this.getKafkaServerPort();
+ _kafkaBrokers = "127.0.0.1:" + this.getKafkaServerPort();
}
@BeforeSuite
@@ -92,7 +93,7 @@ public class HighLevelConsumerTest extends KafkaTestBase {
public static Config getSimpleConfig(Optional<String> prefix) {
Properties properties = new Properties();
- properties.put(getConfigKey(prefix, ConfigurationKeys.KAFKA_BROKERS), "localhost:1234");
+ properties.put(getConfigKey(prefix, ConfigurationKeys.KAFKA_BROKERS), "127.0.0.1:" + TestUtils.findFreePort());
properties.put(getConfigKey(prefix, Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY), Kafka09ConsumerClient.KAFKA_09_DEFAULT_KEY_DESERIALIZER);
properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper");
properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true");
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 103bfe0..77cca0c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -32,6 +32,7 @@ import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -94,6 +95,13 @@ public class KafkaExtractorStatsTracker {
private List<KafkaPartition> partitions;
private long maxPossibleLatency;
+ //Extractor stats aggregated across all partitions processed by the extractor.
+ @Getter (AccessLevel.PACKAGE)
+ @VisibleForTesting
+ private AggregateExtractorStats aggregateExtractorStats = new AggregateExtractorStats();
+ //Aggregate stats for the extractor derived from the most recently completed epoch
+ private AggregateExtractorStats lastAggregateExtractorStats;
+
public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition> partitions) {
this.workUnitState = state;
this.partitions = partitions;
@@ -144,7 +152,7 @@ public class KafkaExtractorStatsTracker {
}
/**
- * A Java POJO that encapsulates various extractor stats.
+ * A Java POJO that encapsulates per-partition extractor stats.
*/
@Data
public static class ExtractorStats {
@@ -166,6 +174,21 @@ public class KafkaExtractorStatsTracker {
}
/**
+ * A Java POJO to track the aggregate extractor stats across all partitions processed by the extractor.
+ */
+ @Data
+ public static class AggregateExtractorStats {
+ private long maxIngestionLatency;
+ private long numBytesConsumed;
+ private long minStartFetchEpochTime = Long.MAX_VALUE;
+ private long maxStopFetchEpochTime;
+ private long minLogAppendTime = Long.MAX_VALUE;
+ private long maxLogAppendTime;
+ private long slaMissedRecordCount;
+ private long processedRecordCount;
+ }
+
+ /**
*
* @param partitionIdx index of Kafka topic partition.
* @return the number of undecodeable records for a given partition id.
@@ -292,6 +315,31 @@ public class KafkaExtractorStatsTracker {
return v;
});
onPartitionReadComplete(partitionIdx, readStartTime);
+ updateAggregateExtractorStats(partitionIdx);
+ }
+
+ private void updateAggregateExtractorStats(int partitionIdx) {
+ ExtractorStats partitionStats = this.statsMap.get(this.partitions.get(partitionIdx));
+
+ if (partitionStats.getStartFetchEpochTime() < aggregateExtractorStats.getMinStartFetchEpochTime()) {
+ aggregateExtractorStats.setMinStartFetchEpochTime(partitionStats.getStartFetchEpochTime());
+ }
+ if (partitionStats.getStopFetchEpochTime() > aggregateExtractorStats.getMaxStopFetchEpochTime()) {
+ aggregateExtractorStats.setMaxStopFetchEpochTime(partitionStats.getStopFetchEpochTime());
+ }
+ long partitionLatency = partitionStats.getStopFetchEpochTime() - partitionStats.getMinLogAppendTime();
+ if (aggregateExtractorStats.getMaxIngestionLatency() < partitionLatency) {
+ aggregateExtractorStats.setMaxIngestionLatency(partitionLatency);
+ }
+ if (aggregateExtractorStats.getMinLogAppendTime() > partitionStats.getMinLogAppendTime()) {
+ aggregateExtractorStats.setMinLogAppendTime(partitionStats.getMinLogAppendTime());
+ }
+ if (aggregateExtractorStats.getMaxLogAppendTime() < partitionStats.getMaxLogAppendTime()) {
+ aggregateExtractorStats.setMaxLogAppendTime(partitionStats.getMaxLogAppendTime());
+ }
+ aggregateExtractorStats.setProcessedRecordCount(aggregateExtractorStats.getProcessedRecordCount() + partitionStats.getProcessedRecordCount());
+ aggregateExtractorStats.setNumBytesConsumed(aggregateExtractorStats.getNumBytesConsumed() + partitionStats.getPartitionTotalSize());
+ aggregateExtractorStats.setSlaMissedRecordCount(aggregateExtractorStats.getSlaMissedRecordCount() + partitionStats.getSlaMissedRecordCount());
}
private Map<String, String> createTagsForPartition(int partitionId, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark, MultiLongWatermark nextWatermark) {
@@ -451,9 +499,31 @@ public class KafkaExtractorStatsTracker {
}
/**
+ * @param timeUnit the time unit for the ingestion latency.
+ * @return the maximum ingestion latency across all partitions processed by the extractor from the last
+ * completed epoch.
+ */
+ public long getMaxIngestionLatency(TimeUnit timeUnit) {
+ return timeUnit.convert(this.lastAggregateExtractorStats.getMaxIngestionLatency(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ *
+ * @return the consumption rate in MB/s across all partitions processed by the extractor from the last
+ * completed epoch.
+ */
+ public double getConsumptionRateMBps() {
+ double consumptionDurationSecs = ((double) (this.lastAggregateExtractorStats.getMaxStopFetchEpochTime() - this.lastAggregateExtractorStats
+ .getMinStartFetchEpochTime())) / 1000;
+ return this.lastAggregateExtractorStats.getNumBytesConsumed() / (consumptionDurationSecs * (1024 * 1024L));
+ }
+
+ /**
* Reset all KafkaExtractor stats.
*/
public void reset() {
+ this.lastAggregateExtractorStats = this.aggregateExtractorStats;
+ this.aggregateExtractorStats = new AggregateExtractorStats();
this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats()));
for (int partitionIdx = 0; partitionIdx < this.partitions.size(); partitionIdx++) {
resetStartFetchEpochTime(partitionIdx);
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
new file mode 100644
index 0000000..d7a8346
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.EvictingQueue;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
+
+
+@Slf4j
+@Alias(value = "KafkaIngestionHealthCheck")
+public class KafkaIngestionHealthCheck implements CommitStep {
+ public static final String KAFKA_INGESTION_HEALTH_CHECK_PREFIX = "gobblin.kafka.healthCheck.";
+ public static final String KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "slidingWindow.size";
+ public static final String KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "ingestionLatency.minutes";
+ public static final String KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "consumptionRate.dropOffFraction";
+ public static final String KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "expected.consumptionRateMbps";
+ public static final String KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "increasingLatencyCheckEnabled";
+
+ public static final int DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE = 3;
+ public static final long DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES= 15;
+ public static final double DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION = 0.7;
+ public static final double DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS = 10.0;
+ private static final boolean DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED = true;
+
+ private final Config config;
+ private final EventBus eventBus;
+ private final KafkaExtractorStatsTracker statsTracker;
+ private final double expectedConsumptionRate;
+ private final double consumptionRateDropOffFraction;
+ private final long ingestionLatencyThresholdMinutes;
+ private final int slidingWindowSize;
+ private final EvictingQueue<Long> ingestionLatencies;
+ private final EvictingQueue<Double> consumptionRateMBps;
+ private final boolean increasingLatencyCheckEnabled;
+
+ public KafkaIngestionHealthCheck(Config config, KafkaExtractorStatsTracker statsTracker) {
+ this.config = config;
+ this.slidingWindowSize = ConfigUtils.getInt(config, KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE);
+ this.ingestionLatencyThresholdMinutes = ConfigUtils.getLong(config, KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES);
+ this.consumptionRateDropOffFraction = ConfigUtils.getDouble(config, KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION);
+ this.expectedConsumptionRate = ConfigUtils.getDouble(config, KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS);
+ this.increasingLatencyCheckEnabled = ConfigUtils.getBoolean(config, KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED);
+ this.ingestionLatencies = EvictingQueue.create(this.slidingWindowSize);
+ this.consumptionRateMBps = EvictingQueue.create(this.slidingWindowSize);
+ EventBus eventBus;
+ try {
+ eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+ } catch (IOException e) {
+ log.error("Could not find EventBus instance for container health check", e);
+ eventBus = null;
+ }
+ this.eventBus = eventBus;
+ this.statsTracker = statsTracker;
+ }
+
+ /**
+ *
+ * @return true if (i) ingestionLatency in the each of the recent epochs exceeds the threshold latency , AND (ii)
+ * if {@link KafkaIngestionHealthCheck#increasingLatencyCheckEnabled} is true, the latency
+ * is increasing over these epochs.
+ */
+ private boolean checkIngestionLatency() {
+ Long previousLatency = -1L;
+ for (Long ingestionLatency: ingestionLatencies) {
+ if (ingestionLatency < this.ingestionLatencyThresholdMinutes) {
+ return false;
+ } else {
+ if (this.increasingLatencyCheckEnabled) {
+ if (previousLatency >= ingestionLatency) {
+ return false;
+ }
+ previousLatency = ingestionLatency;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Determine whether the commit step has been completed.
+ */
+ @Override
+ public boolean isCompleted()
+ throws IOException {
+ return false;
+ }
+
+ /**
+ * @return Return a serialized string representation of health check report.
+ */
+ private String getHealthCheckReport() {
+ return String.format("Ingestion Latencies = %s, Ingestion Latency Threshold = %s minutes, "
+ + "Consumption Rates = %s, Target Consumption Rate = %s MBps", this.ingestionLatencies.toString(),
+ this.ingestionLatencyThresholdMinutes, this.consumptionRateMBps.toString(), this.expectedConsumptionRate);
+ }
+
+ /**
+ * Execute the commit step. The execute method gets the maximum ingestion latency and the consumption rate and emits
+ * a {@link ContainerHealthCheckFailureEvent} if the following conditions are satisfied:
+ * <li>
+ * <ul>The ingestion latency increases monotonically over the {@link KafkaIngestionHealthCheck#slidingWindowSize} intervals, AND </ul>
+ * <ul>The maximum consumption rate over the {@link KafkaIngestionHealthCheck#slidingWindowSize} intervals is smaller than
+ * {@link KafkaIngestionHealthCheck#consumptionRateDropOffFraction} * {@link KafkaIngestionHealthCheck#expectedConsumptionRate}</ul>.
+ * </li>
+ *
+ * The {@link ContainerHealthCheckFailureEvent} is posted to a global event bus. The handlers of this event type
+ * can perform suitable actions based on the execution environment.
+ */
+ @Override
+ public void execute() {
+ this.ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.MINUTES));
+ this.consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps());
+ if (ingestionLatencies.size() < this.slidingWindowSize) {
+ log.info("SUCCESS: Num observations: {} smaller than {}", ingestionLatencies.size(), this.slidingWindowSize);
+ return;
+ }
+
+ if (!checkIngestionLatency()) {
+ log.info("SUCCESS: Ingestion Latencies = {}, Ingestion Latency Threshold: {}", this.ingestionLatencies.toString(), this.ingestionLatencyThresholdMinutes);
+ return;
+ }
+
+ double avgConsumptionRate = getMaxConsumptionRate();
+ if (avgConsumptionRate > this.consumptionRateDropOffFraction * this.expectedConsumptionRate) {
+ log.info("SUCCESS: Avg. Consumption Rate = {} MBps, Target Consumption rate = {} MBps", avgConsumptionRate, this.expectedConsumptionRate);
+ return;
+ }
+
+ log.error("FAILED: {}", getHealthCheckReport());
+
+ if (this.eventBus != null) {
+ log.info("Posting {} message to EventBus", ContainerHealthCheckFailureEvent.class.getSimpleName());
+ ContainerHealthCheckFailureEvent event = new ContainerHealthCheckFailureEvent(this.config, getClass().getName());
+ event.addMetadata("ingestionLatencies", this.ingestionLatencies.toString());
+ event.addMetadata("consumptionRates", this.consumptionRateMBps.toString());
+ event.addMetadata("ingestionLatencyThreshold", Long.toString(this.ingestionLatencyThresholdMinutes));
+ event.addMetadata("targetConsumptionRate", Double.toString(this.expectedConsumptionRate));
+ this.eventBus.post(event);
+ }
+ }
+
+ private double getMaxConsumptionRate() {
+ return consumptionRateMBps.stream().mapToDouble(consumptionRate -> consumptionRate)
+ .filter(consumptionRate -> consumptionRate >= 0.0).max().orElse(0.0);
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 1f959f2..67e1146 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogReader;
@@ -40,6 +41,7 @@ public class KafkaExtractorStatsTrackerTest {
private WorkUnitState workUnitState;
final static KafkaPartition PARTITION0 = new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build();
final static KafkaPartition PARTITION1 = new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build();
+ private long epochDurationMs;
@BeforeClass
public void setUp() {
@@ -80,12 +82,13 @@ public class KafkaExtractorStatsTrackerTest {
@Test
public void testOnDecodeableRecord() throws InterruptedException {
+ this.extractorStatsTracker.reset();
long readStartTime = System.nanoTime();
Thread.sleep(1);
long decodeStartTime = System.nanoTime();
long currentTimeMillis = System.currentTimeMillis();
- long logAppendTimestamp = currentTimeMillis - 15 * 60 * 1000L;
- long recordCreationTimestamp = currentTimeMillis - 16 * 60 * 1000L;
+ long logAppendTimestamp = currentTimeMillis - TimeUnit.MINUTES.toMillis(15);
+ long recordCreationTimestamp = currentTimeMillis - TimeUnit.MINUTES.toMillis(16);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 0);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() == 0);
@@ -172,6 +175,20 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMinLogAppendTime(), logAppendTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime(), logAppendTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 3);
+
+ long startFetchEpochTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getStartFetchEpochTime();
+ long stopFetchEpochTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getStopFetchEpochTime();
+ this.epochDurationMs = stopFetchEpochTime - startFetchEpochTime;
+ long minLogAppendTimestamp = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime();
+ long maxLogAppendTimestamp = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime();
+ //Ensure aggregate extractor stats have been updated correctly for the completed epoch
+ Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMinStartFetchEpochTime(), startFetchEpochTime);
+ Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMaxStopFetchEpochTime(), stopFetchEpochTime);
+ Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMinLogAppendTime(), minLogAppendTimestamp);
+ Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMaxLogAppendTime(), maxLogAppendTimestamp);
+ Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getNumBytesConsumed(), 300L);
+ Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getProcessedRecordCount(), 3L);
+ Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getSlaMissedRecordCount(), 1);
}
@Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")
@@ -189,6 +206,17 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150);
}
+ @Test (dependsOnMethods = "testGetAvgRecordSize")
+ public void testGetMaxLatency() {
+ Assert.assertTrue(this.extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES) >= 15);
+ }
+
+ @Test (dependsOnMethods = "testGetAvgRecordSize")
+ public void testGetConsumptionRateMBps() {
+ double a = this.extractorStatsTracker.getConsumptionRateMBps();
+ Assert.assertEquals((new Double(Math.ceil(a * epochDurationMs * 1024 * 1024) / 1000)).longValue(), 300L);
+ }
+
@Test
public void testGenerateTagsForPartitions() throws Exception {
MultiLongWatermark lowWatermark = new MultiLongWatermark(Arrays.asList(new Long(10), new Long(20)));
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
new file mode 100644
index 0000000..ff7049f
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
+
+@Test (singleThreaded = true)
+public class KafkaIngestionHealthCheckTest {
+ private EventBus eventBus;
+ private CountDownLatch countDownLatch;
+
+ @BeforeClass
+ public void setUp() throws IOException {
+ this.eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+ this.eventBus.register(this);
+ }
+
+ @Subscribe
+ public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) {
+ this.countDownLatch.countDown();
+ }
+
+ @Test
+ public void testExecuteIncreasingLatencyCheckEnabled()
+ throws InterruptedException {
+ this.countDownLatch = new CountDownLatch(1);
+ Config config = ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY,
+ ConfigValueFactory.fromAnyRef(5))
+ .withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, ConfigValueFactory.fromAnyRef(5));
+
+ KafkaExtractorStatsTracker extractorStatsTracker = Mockito.mock(KafkaExtractorStatsTracker.class);
+ Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES))
+ .thenReturn(6L)
+ .thenReturn(7L)
+ .thenReturn(10L)
+ .thenReturn(5L);
+ Mockito.when(extractorStatsTracker.getConsumptionRateMBps())
+ .thenReturn(2.0)
+ .thenReturn(1.5)
+ .thenReturn(2.1)
+ .thenReturn(2.5);
+
+ KafkaIngestionHealthCheck check = new KafkaIngestionHealthCheck(config, extractorStatsTracker);
+
+ //Latency increases continuously for the first 3 calls to execute().
+ check.execute();
+ this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(this.countDownLatch.getCount(), 1L);
+ check.execute();
+ this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(this.countDownLatch.getCount(), 1L);
+ check.execute();
+ //Ensure that ContainerHealthCheckFailureEvent is posted to eventBus; countDownLatch should be back to 0.
+ this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(this.countDownLatch.getCount(), 0);
+
+ //Set the countdown latch back to 1.
+ this.countDownLatch = new CountDownLatch(1);
+ //Latency decreases from 10 to 5. So check.execute() should not post any event to EventBus.
+ check.execute();
+ this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(this.countDownLatch.getCount(), 1);
+
+ config = config.withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(false));
+ extractorStatsTracker = Mockito.mock(KafkaExtractorStatsTracker.class);
+ Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES))
+ .thenReturn(10L)
+ .thenReturn(7L)
+ .thenReturn(5L);
+ Mockito.when(extractorStatsTracker.getConsumptionRateMBps())
+ .thenReturn(2.0)
+ .thenReturn(1.5)
+ .thenReturn(2.1);
+
+ check = new KafkaIngestionHealthCheck(config, extractorStatsTracker);
+
+ check.execute();
+ }
+
+ @Test
+ public void testExecuteIncreasingLatencyCheckDisabled()
+ throws InterruptedException {
+ this.countDownLatch = new CountDownLatch(1);
+
+ Config config = ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY,
+ ConfigValueFactory.fromAnyRef(5))
+ .withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, ConfigValueFactory.fromAnyRef(5))
+ .withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(false));
+
+ KafkaExtractorStatsTracker extractorStatsTracker = Mockito.mock(KafkaExtractorStatsTracker.class);
+ Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES))
+ .thenReturn(10L)
+ .thenReturn(7L)
+ .thenReturn(6L)
+ .thenReturn(4L);
+ Mockito.when(extractorStatsTracker.getConsumptionRateMBps())
+ .thenReturn(2.0)
+ .thenReturn(1.5)
+ .thenReturn(2.1)
+ .thenReturn(2.5);
+
+ KafkaIngestionHealthCheck check = new KafkaIngestionHealthCheck(config, extractorStatsTracker);
+
+ //Latency consistently above 5 minutes for the first 3 calls to execute().
+ check.execute();
+ this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(this.countDownLatch.getCount(), 1L);
+ check.execute();
+ this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(this.countDownLatch.getCount(), 1L);
+ check.execute();
+ //Ensure that ContainerHealthCheckFailureEvent is posted to eventBus; countDownLatch should be back to 0.
+ this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(this.countDownLatch.getCount(), 0);
+
+ //Set the countdown latch back to 1.
+ this.countDownLatch = new CountDownLatch(1);
+ //Latency decreases to 4. So check.execute() should not post any event to EventBus.
+ check.execute();
+ this.countDownLatch.await(10, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(this.countDownLatch.getCount(), 1);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
similarity index 97%
rename from gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
rename to gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index ded0d5c..282ede6 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -25,9 +25,7 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
-
-import org.apache.gobblin.restli.EmbeddedRestliServer;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jgit.api.Git;
@@ -52,12 +50,13 @@ import com.linkedin.restli.client.RestLiResponseException;
import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.core.GitConfigMonitor;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
@@ -93,6 +92,7 @@ public class GobblinServiceManagerTest {
private FlowConfigClient flowConfigClient;
private Git gitForPush;
+ private TestingServer testingServer;
@BeforeClass
public void setup() throws Exception {
@@ -100,14 +100,12 @@ public class GobblinServiceManagerTest {
cleanUpDir(SPEC_STORE_PARENT_DIR);
ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
- KafkaTestBase kafkaTestHelper = new KafkaTestBase();
- kafkaTestHelper.startServers();
-
+ testingServer = new TestingServer(true);
Properties serviceCoreProperties = new Properties();
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
- serviceCoreProperties.put("zookeeper.connect", kafkaTestHelper.getZkConnectString());
+ serviceCoreProperties.put("zookeeper.connect", testingServer.getConnectString());
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR);
@@ -178,6 +176,11 @@ public class GobblinServiceManagerTest {
} catch (Exception e) {
logger.warn("Could not completely cleanup Spec Store Parent Dir");
}
+ try {
+ this.testingServer.close();
+ } catch(Exception e) {
+ System.err.println("Failed to close ZK testing server.");
+ }
}
@Test
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
new file mode 100644
index 0000000..e10e675
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * An event type to signal failure of a container health check. This event can be generated from anywhere
+ * inside the application. This event is intended to be emitted
+ * over an {@link com.google.common.eventbus.EventBus} instance.
+ */
+package org.apache.gobblin.util.event;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+public class ContainerHealthCheckFailureEvent {
+ public static final String CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME = "ContainerHealthCheckEventBus";
+
+ @Getter
+ private final Config config;
+
+ /**
+ * Name of the class that generated this failure event.
+ */
+ @Getter
+ private final String className;
+
+ @Getter
+ private final Map<String, String> metadata = Maps.newHashMap();
+
+ public ContainerHealthCheckFailureEvent(Config config, String className) {
+ this.config = config;
+ this.className = className;
+ }
+
+ public void addMetadata(String key, String value) {
+ metadata.put(key, value);
+ }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusFactory.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusFactory.java
new file mode 100644
index 0000000..4c4fdbd
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.eventbus;
+
+import java.io.IOException;
+
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopeType;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * A {@link SharedResourceFactory} for creating {@link EventBus} instances.
+ * @param <S>
+ */
+public class EventBusFactory<S extends ScopeType<S>> implements SharedResourceFactory<EventBus, EventBusKey, S> {
+ public static final String FACTORY_NAME = "eventbus";
+
+ @Override
+ public String getName() {
+ return FACTORY_NAME;
+ }
+
+ public static <S extends ScopeType<S>> EventBus get(String eventBusName, SharedResourcesBroker<S> broker)
+ throws IOException {
+ try {
+ return broker.getSharedResource(new EventBusFactory<S>(), new EventBusKey(eventBusName));
+ } catch (NotConfiguredException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public SharedResourceFactoryResponse<EventBus> createResource(SharedResourcesBroker<S> broker,
+ ScopedConfigView<S, EventBusKey> config) {
+ EventBusKey eventBusKey = config.getKey();
+ EventBus eventBus = new EventBus(eventBusKey.getSourceClassName());
+ return new ResourceInstance<>(eventBus);
+ }
+
+ @Override
+ public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, EventBusKey> config) {
+ return broker.selfScope().getType().rootScope();
+ }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusKey.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusKey.java
new file mode 100644
index 0000000..7f4c459
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusKey.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.eventbus;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.broker.iface.SharedResourceKey;
+
+@EqualsAndHashCode
+@Getter
+public class EventBusKey implements SharedResourceKey{
+ private final String sourceClassName;
+
+ public EventBusKey(String sourceClassName) {
+ this.sourceClassName = sourceClassName;
+ }
+
+ /**
+ * @return A serialization of the {@link SharedResourceKey} into a short, sanitized string. Users configure a
+ * shared resource using the value of this method.
+ */
+ @Override
+ public String toConfigurationKey() {
+ return this.sourceClassName;
+ }
+}
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/eventbus/EventBusFactoryTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/eventbus/EventBusFactoryTest.java
new file mode 100644
index 0000000..18cd5a1
--- /dev/null
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/eventbus/EventBusFactoryTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.eventbus;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.SharedResourcesBrokerImpl;
+import org.apache.gobblin.broker.SimpleScope;
+import org.apache.gobblin.broker.SimpleScopeType;
+import org.apache.gobblin.broker.iface.NoSuchScopeException;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+public class EventBusFactoryTest {
+
+ @Test
+ public void testGet()
+ throws NotConfiguredException, IOException, NoSuchScopeException {
+ SharedResourcesBrokerImpl<SimpleScopeType> broker = SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(
+ ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+ EventBus eventBus1 = EventBusFactory.get(getClass().getSimpleName(), broker);
+ EventBus eventBus2 = EventBusFactory.get(getClass().getSimpleName(), broker);
+
+ //Should return the same eventbus instance
+ Assert.assertEquals(eventBus1, eventBus2);
+
+ SharedResourcesBroker<SimpleScopeType> subBroker =
+ broker.newSubscopedBuilder(new SimpleScope<>(SimpleScopeType.LOCAL, "local")).build();
+ EventBus eventBus3 = EventBusFactory.get(getClass().getSimpleName(), subBroker);
+ //Should return the same eventbus instance
+ Assert.assertEquals(eventBus1, eventBus3);
+
+ //Create a new eventbus with local scope
+ EventBus eventBus4 = subBroker.getSharedResourceAtScope(new EventBusFactory<>(), new EventBusKey(getClass().getSimpleName()), SimpleScopeType.LOCAL);
+ Assert.assertNotEquals(eventBus3, eventBus4);
+
+ //Create an eventbus with different source class name
+ EventBus eventBus5 = EventBusFactory.get("", broker);
+ Assert.assertNotEquals(eventBus1, eventBus5);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index f79a5bb..1abf8d2 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,7 +25,6 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.gobblin.util.logs.LogCopier;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -46,11 +45,13 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import org.apache.gobblin.cluster.ContainerHealthCheckException;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.GobblinTaskRunner;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
@@ -209,6 +210,12 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
} catch (ParseException pe) {
printUsage(options);
System.exit(1);
+ } catch (ContainerHealthCheckException e) {
+ // Ideally, we should not be catching this exception, as this is indicative of a non-recoverable exception. However,
+ // simply propagating the exception may prevent the container exit due to the presence of non-daemon threads present
+ // in the application. Hence, we catch this exception to invoke System.exit() which in turn ensures that all non-daemon threads are killed.
+ LOGGER.error("Exception encountered: {}", e);
+ System.exit(1);
}
}
}
\ No newline at end of file
diff --git a/travis/test-groups.inc b/travis/test-groups.inc
index af6e517..8eef534 100644
--- a/travis/test-groups.inc
+++ b/travis/test-groups.inc
@@ -1 +1 @@
-TEST_GROUP1=gobbin.yarn,gobblin.runtime,gobblin.cluster,gobblin.compaction
+TEST_GROUP1=gobbin.yarn,gobblin.runtime,gobblin.cluster,gobblin.compaction,gobblin.util,gobblin.writer