You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2020/04/08 02:58:42 UTC
[druid] branch 0.18.0 updated: Fix NPE in RemoteTaskRunner event
handler causes JVM shutdown (#9610) (#9641)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.0 by this push:
new 325f299 Fix NPE in RemoteTaskRunner event handler causes JVM shutdown (#9610) (#9641)
325f299 is described below
commit 325f29977740b1ff446f01b98179e377760e47f4
Author: Maytas Monsereenusorn <52...@users.noreply.github.com>
AuthorDate: Tue Apr 7 16:58:27 2020 -1000
Fix NPE in RemoteTaskRunner event handler causes JVM shutdown (#9610) (#9641)
* Fix NPE in RemoteTaskRunner event handler causes JVM shutdown
* address comments
* fix compile
* fix checkstyle
* fix lgtm
* fix merge
* fix test
* fix tests
* change scope
* address comments
* address comments
---
extensions-contrib/ambari-metrics-emitter/pom.xml | 12 ++
.../druid/indexing/overlord/RemoteTaskRunner.java | 232 ++++++++++++---------
.../indexing/overlord/RemoteTaskRunnerTest.java | 36 ++++
licenses.yaml | 2 +-
pom.xml | 2 +-
.../druid/curator/discovery/DiscoveryModule.java | 12 ++
6 files changed, 191 insertions(+), 105 deletions(-)
diff --git a/extensions-contrib/ambari-metrics-emitter/pom.xml b/extensions-contrib/ambari-metrics-emitter/pom.xml
index 7cd69ea..fe94d09 100644
--- a/extensions-contrib/ambari-metrics-emitter/pom.xml
+++ b/extensions-contrib/ambari-metrics-emitter/pom.xml
@@ -131,6 +131,18 @@
<artifactId>JUnitParams</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>${codehaus.jackson.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${codehaus.jackson.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index fe6e8be..dbaadf9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -45,6 +45,7 @@ import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.CuratorUtils;
@@ -969,116 +970,141 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
// Add status listener to the watcher for status changes
- zkWorker.addListener(
- (client, event) -> {
- final String taskId;
- final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
- synchronized (statusLock) {
- try {
- switch (event.getType()) {
- case CHILD_ADDED:
- case CHILD_UPDATED:
- taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
- final TaskAnnouncement announcement = jsonMapper.readValue(
- event.getData().getData(), TaskAnnouncement.class
- );
-
- log.info(
- "Worker[%s] wrote %s status for task [%s] on [%s]",
- zkWorker.getWorker().getHost(),
- announcement.getTaskStatus().getStatusCode(),
- taskId,
- announcement.getTaskLocation()
- );
-
- // Synchronizing state with ZK
- statusLock.notifyAll();
-
- final RemoteTaskRunnerWorkItem tmp;
- if ((tmp = runningTasks.get(taskId)) != null) {
- taskRunnerWorkItem = tmp;
- } else {
- final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
- taskId,
- announcement.getTaskType(),
- zkWorker.getWorker(),
- TaskLocation.unknown(),
- announcement.getTaskDataSource()
- );
- final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
- taskId,
- newTaskRunnerWorkItem
- );
- if (existingItem == null) {
- log.warn(
- "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
- zkWorker.getWorker().getHost(),
- taskId
- );
- taskRunnerWorkItem = newTaskRunnerWorkItem;
- } else {
- taskRunnerWorkItem = existingItem;
- }
- }
-
- if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
- taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
- TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
- }
+ zkWorker.addListener(getStatusListener(worker, zkWorker, retVal));
+ zkWorker.start();
+ return retVal;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
- if (announcement.getTaskStatus().isComplete()) {
- taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
- runPendingTasks();
- }
- break;
- case CHILD_REMOVED:
- taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
- taskRunnerWorkItem = runningTasks.remove(taskId);
- if (taskRunnerWorkItem != null) {
- log.info("Task[%s] just disappeared!", taskId);
- taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
- TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
- } else {
- log.info("Task[%s] went bye bye.", taskId);
- }
- break;
- case INITIALIZED:
- if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
- retVal.set(zkWorker);
- } else {
- final String message = StringUtils.format(
- "WTF?! Tried to add already-existing worker[%s]",
- worker.getHost()
- );
- log.makeAlert(message)
- .addData("workerHost", worker.getHost())
- .addData("workerIp", worker.getIp())
- .emit();
- retVal.setException(new IllegalStateException(message));
- }
- runPendingTasks();
- break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_RECONNECTED:
- case CONNECTION_LOST:
- // do nothing
+ @VisibleForTesting
+ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture<ZkWorker> retVal)
+ {
+ return (client, event) -> {
+ final String taskId;
+ final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
+ synchronized (statusLock) {
+ try {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ if (event.getData() == null) {
+ log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
+ log.makeAlert("Unexpected null for event.getData() in handle new worker status")
+ .addData("worker", zkWorker.getWorker().getHost())
+ .addData("eventType", event.getType().toString())
+ .emit();
+ return;
+ }
+ taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+ final TaskAnnouncement announcement = jsonMapper.readValue(
+ event.getData().getData(), TaskAnnouncement.class
+ );
+
+ log.info(
+ "Worker[%s] wrote %s status for task [%s] on [%s]",
+ zkWorker.getWorker().getHost(),
+ announcement.getTaskStatus().getStatusCode(),
+ taskId,
+ announcement.getTaskLocation()
+ );
+
+ // Synchronizing state with ZK
+ statusLock.notifyAll();
+
+ final RemoteTaskRunnerWorkItem tmp;
+ if ((tmp = runningTasks.get(taskId)) != null) {
+ taskRunnerWorkItem = tmp;
+ } else {
+ final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
+ taskId,
+ announcement.getTaskType(),
+ zkWorker.getWorker(),
+ TaskLocation.unknown(),
+ announcement.getTaskDataSource()
+ );
+ final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
+ taskId,
+ newTaskRunnerWorkItem
+ );
+ if (existingItem == null) {
+ log.warn(
+ "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
+ zkWorker.getWorker().getHost(),
+ taskId
+ );
+ taskRunnerWorkItem = newTaskRunnerWorkItem;
+ } else {
+ taskRunnerWorkItem = existingItem;
}
}
- catch (Exception e) {
- log.makeAlert(e, "Failed to handle new worker status")
+
+ if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
+ taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
+ TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
+ }
+
+ if (announcement.getTaskStatus().isComplete()) {
+ taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
+ runPendingTasks();
+ }
+ break;
+ case CHILD_REMOVED:
+ if (event.getData() == null) {
+ log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
+ log.makeAlert("Unexpected null for event.getData() in handle new worker status")
.addData("worker", zkWorker.getWorker().getHost())
- .addData("znode", event.getData().getPath())
+ .addData("eventType", event.getType().toString())
.emit();
+ return;
}
- }
+ taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+ taskRunnerWorkItem = runningTasks.remove(taskId);
+ if (taskRunnerWorkItem != null) {
+ log.info("Task[%s] just disappeared!", taskId);
+ taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
+ TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
+ } else {
+ log.info("Task[%s] went bye bye.", taskId);
+ }
+ break;
+ case INITIALIZED:
+ if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
+ retVal.set(zkWorker);
+ } else {
+ final String message = StringUtils.format(
+ "This should not happen...tried to add already-existing worker[%s]",
+ worker.getHost()
+ );
+ log.makeAlert(message)
+ .addData("workerHost", worker.getHost())
+ .addData("workerIp", worker.getIp())
+ .emit();
+ retVal.setException(new IllegalStateException(message));
+ }
+ runPendingTasks();
+ break;
+ case CONNECTION_SUSPENDED:
+ case CONNECTION_RECONNECTED:
+ case CONNECTION_LOST:
+ // do nothing
}
- );
- zkWorker.start();
- return retVal;
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
+ }
+ catch (Exception e) {
+ String znode = null;
+ if (event.getData() != null) {
+ znode = event.getData().getPath();
+ }
+ log.makeAlert(e, "Failed to handle new worker status")
+ .addData("worker", zkWorker.getWorker().getHost())
+ .addData("znode", znode)
+ .addData("eventType", event.getType().toString())
+ .emit();
+ }
+ }
+ };
}
/**
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 0601648..b23e063 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexingServiceCondition;
@@ -44,6 +45,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
+import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
@@ -55,6 +57,7 @@ import org.junit.rules.TestRule;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -944,4 +947,37 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(taskFuture2.get().isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
}
+
+ @Test
+ public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception
+ {
+ // Set up mock emitter to verify log alert when exception is thrown inside the status listener
+ Worker worker = EasyMock.createMock(Worker.class);
+ EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce();
+ EasyMock.replay(worker);
+ ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
+ Capture<EmittingLogger.EmittingAlertBuilder> capturedArgument = Capture.newInstance();
+ emitter.emit(EasyMock.capture(capturedArgument));
+ EasyMock.expectLastCall().atLeastOnce();
+ EmittingLogger.registerEmitter(emitter);
+ EasyMock.replay(emitter);
+
+ PathChildrenCache cache = new PathChildrenCache(cf, "/test", true);
+ testStartWithNoWorker();
+ cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, jsonMapper), null));
+ cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+
+ // Status listener will recieve event with null data
+ Assert.assertTrue(
+ TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1)
+ );
+
+ // Verify that the log emitter was called
+ EasyMock.verify(worker);
+ EasyMock.verify(emitter);
+ Map<String, Object> alertDataMap = capturedArgument.getValue().build(null).getDataMap();
+ Assert.assertTrue(alertDataMap.containsKey("znode"));
+ Assert.assertNull(alertDataMap.get("znode"));
+ // Status listener should successfully completes without throwing exception
+ }
}
diff --git a/licenses.yaml b/licenses.yaml
index bcdaa8c..3442697 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -1315,7 +1315,7 @@ name: Apache Curator
license_category: binary
module: java-core
license_name: Apache License version 2.0
-version: 4.1.0
+version: 4.3.0
libraries:
- org.apache.curator: curator-client
- org.apache.curator: curator-framework
diff --git a/pom.xml b/pom.xml
index 6d28e2f..391ac8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@
<java.version>8</java.version>
<project.build.resourceEncoding>UTF-8</project.build.resourceEncoding>
<aether.version>0.9.0.M2</aether.version>
- <apache.curator.version>4.1.0</apache.curator.version>
+ <apache.curator.version>4.3.0</apache.curator.version>
<apache.curator.test.version>2.12.0</apache.curator.test.version>
<apache.kafka.version>2.2.2</apache.kafka.version>
<avatica.version>1.15.0</avatica.version>
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
index fd89964..78c5669 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
@@ -482,6 +482,18 @@ public class DiscoveryModule implements Module
{
return this;
}
+
+ @Override
+ public ServiceProviderBuilder<T> executorService(ExecutorService executorService)
+ {
+ return this;
+ }
+
+ @Override
+ public ServiceProviderBuilder<T> executorService(CloseableExecutorService closeableExecutorService)
+ {
+ return this;
+ }
}
private static class NoopServiceProvider<T> implements ServiceProvider<T>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org