You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/04/07 21:54:06 UTC

[druid] branch master updated: Fix NPE in RemoteTaskRunner event handler causes JVM shutdown (#9610)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b95a1b9  Fix NPE in RemoteTaskRunner event handler causes JVM shutdown (#9610)
b95a1b9 is described below

commit b95a1b9878588ccc975fd6724a02b18b89c4d46f
Author: Maytas Monsereenusorn <52...@users.noreply.github.com>
AuthorDate: Tue Apr 7 11:53:51 2020 -1000

    Fix NPE in RemoteTaskRunner event handler causes JVM shutdown (#9610)
    
    * Fix NPE in RemoteTaskRunner event handler causes JVM shutdown
    
    * address comments
    
    * fix compile
    
    * fix checkstyle
    
    * fix lgtm
    
    * fix merge
    
    * fix test
    
    * fix tests
    
    * change scope
    
    * address comments
    
    * address comments
---
 extensions-contrib/ambari-metrics-emitter/pom.xml  |  12 ++
 .../druid/indexing/overlord/RemoteTaskRunner.java  | 232 ++++++++++++---------
 .../indexing/overlord/RemoteTaskRunnerTest.java    |  36 ++++
 licenses.yaml                                      |   2 +-
 pom.xml                                            |   2 +-
 .../druid/curator/discovery/DiscoveryModule.java   |  12 ++
 6 files changed, 191 insertions(+), 105 deletions(-)

diff --git a/extensions-contrib/ambari-metrics-emitter/pom.xml b/extensions-contrib/ambari-metrics-emitter/pom.xml
index 72786ed..aadf49d 100644
--- a/extensions-contrib/ambari-metrics-emitter/pom.xml
+++ b/extensions-contrib/ambari-metrics-emitter/pom.xml
@@ -131,6 +131,18 @@
       <artifactId>JUnitParams</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <version>${codehaus.jackson.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>${codehaus.jackson.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index fe6e8be..dbaadf9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -45,6 +45,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.concurrent.LifecycleLock;
 import org.apache.druid.curator.CuratorUtils;
@@ -969,116 +970,141 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
       );
 
       // Add status listener to the watcher for status changes
-      zkWorker.addListener(
-          (client, event) -> {
-            final String taskId;
-            final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
-            synchronized (statusLock) {
-              try {
-                switch (event.getType()) {
-                  case CHILD_ADDED:
-                  case CHILD_UPDATED:
-                    taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
-                    final TaskAnnouncement announcement = jsonMapper.readValue(
-                        event.getData().getData(), TaskAnnouncement.class
-                    );
-
-                    log.info(
-                        "Worker[%s] wrote %s status for task [%s] on [%s]",
-                        zkWorker.getWorker().getHost(),
-                        announcement.getTaskStatus().getStatusCode(),
-                        taskId,
-                        announcement.getTaskLocation()
-                    );
-
-                    // Synchronizing state with ZK
-                    statusLock.notifyAll();
-
-                    final RemoteTaskRunnerWorkItem tmp;
-                    if ((tmp = runningTasks.get(taskId)) != null) {
-                      taskRunnerWorkItem = tmp;
-                    } else {
-                      final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
-                          taskId,
-                          announcement.getTaskType(),
-                          zkWorker.getWorker(),
-                          TaskLocation.unknown(),
-                          announcement.getTaskDataSource()
-                      );
-                      final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
-                          taskId,
-                          newTaskRunnerWorkItem
-                      );
-                      if (existingItem == null) {
-                        log.warn(
-                            "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
-                            zkWorker.getWorker().getHost(),
-                            taskId
-                        );
-                        taskRunnerWorkItem = newTaskRunnerWorkItem;
-                      } else {
-                        taskRunnerWorkItem = existingItem;
-                      }
-                    }
-
-                    if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
-                      taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
-                      TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
-                    }
+      zkWorker.addListener(getStatusListener(worker, zkWorker, retVal));
+      zkWorker.start();
+      return retVal;
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 
-                    if (announcement.getTaskStatus().isComplete()) {
-                      taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
-                      runPendingTasks();
-                    }
-                    break;
-                  case CHILD_REMOVED:
-                    taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
-                    taskRunnerWorkItem = runningTasks.remove(taskId);
-                    if (taskRunnerWorkItem != null) {
-                      log.info("Task[%s] just disappeared!", taskId);
-                      taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
-                      TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
-                    } else {
-                      log.info("Task[%s] went bye bye.", taskId);
-                    }
-                    break;
-                  case INITIALIZED:
-                    if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
-                      retVal.set(zkWorker);
-                    } else {
-                      final String message = StringUtils.format(
-                          "WTF?! Tried to add already-existing worker[%s]",
-                          worker.getHost()
-                      );
-                      log.makeAlert(message)
-                         .addData("workerHost", worker.getHost())
-                         .addData("workerIp", worker.getIp())
-                         .emit();
-                      retVal.setException(new IllegalStateException(message));
-                    }
-                    runPendingTasks();
-                    break;
-                  case CONNECTION_SUSPENDED:
-                  case CONNECTION_RECONNECTED:
-                  case CONNECTION_LOST:
-                    // do nothing
+  @VisibleForTesting
+  PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture<ZkWorker> retVal)
+  {
+    return (client, event) -> {
+      final String taskId;
+      final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
+      synchronized (statusLock) {
+        try {
+          switch (event.getType()) {
+            case CHILD_ADDED:
+            case CHILD_UPDATED:
+              if (event.getData() == null) {
+                log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
+                log.makeAlert("Unexpected null for event.getData() in handle new worker status")
+                   .addData("worker", zkWorker.getWorker().getHost())
+                   .addData("eventType", event.getType().toString())
+                   .emit();
+                return;
+              }
+              taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+              final TaskAnnouncement announcement = jsonMapper.readValue(
+                  event.getData().getData(), TaskAnnouncement.class
+              );
+
+              log.info(
+                  "Worker[%s] wrote %s status for task [%s] on [%s]",
+                  zkWorker.getWorker().getHost(),
+                  announcement.getTaskStatus().getStatusCode(),
+                  taskId,
+                  announcement.getTaskLocation()
+              );
+
+              // Synchronizing state with ZK
+              statusLock.notifyAll();
+
+              final RemoteTaskRunnerWorkItem tmp;
+              if ((tmp = runningTasks.get(taskId)) != null) {
+                taskRunnerWorkItem = tmp;
+              } else {
+                final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
+                    taskId,
+                    announcement.getTaskType(),
+                    zkWorker.getWorker(),
+                    TaskLocation.unknown(),
+                    announcement.getTaskDataSource()
+                );
+                final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
+                    taskId,
+                    newTaskRunnerWorkItem
+                );
+                if (existingItem == null) {
+                  log.warn(
+                      "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
+                      zkWorker.getWorker().getHost(),
+                      taskId
+                  );
+                  taskRunnerWorkItem = newTaskRunnerWorkItem;
+                } else {
+                  taskRunnerWorkItem = existingItem;
                 }
               }
-              catch (Exception e) {
-                log.makeAlert(e, "Failed to handle new worker status")
+
+              if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
+                taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
+                TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
+              }
+
+              if (announcement.getTaskStatus().isComplete()) {
+                taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
+                runPendingTasks();
+              }
+              break;
+            case CHILD_REMOVED:
+              if (event.getData() == null) {
+                log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
+                log.makeAlert("Unexpected null for event.getData() in handle new worker status")
                    .addData("worker", zkWorker.getWorker().getHost())
-                   .addData("znode", event.getData().getPath())
+                   .addData("eventType", event.getType().toString())
                    .emit();
+                return;
               }
-            }
+              taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+              taskRunnerWorkItem = runningTasks.remove(taskId);
+              if (taskRunnerWorkItem != null) {
+                log.info("Task[%s] just disappeared!", taskId);
+                taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
+                TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
+              } else {
+                log.info("Task[%s] went bye bye.", taskId);
+              }
+              break;
+            case INITIALIZED:
+              if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
+                retVal.set(zkWorker);
+              } else {
+                final String message = StringUtils.format(
+                    "This should not happen...tried to add already-existing worker[%s]",
+                    worker.getHost()
+                );
+                log.makeAlert(message)
+                   .addData("workerHost", worker.getHost())
+                   .addData("workerIp", worker.getIp())
+                   .emit();
+                retVal.setException(new IllegalStateException(message));
+              }
+              runPendingTasks();
+              break;
+            case CONNECTION_SUSPENDED:
+            case CONNECTION_RECONNECTED:
+            case CONNECTION_LOST:
+              // do nothing
           }
-      );
-      zkWorker.start();
-      return retVal;
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+        }
+        catch (Exception e) {
+          String znode = null;
+          if (event.getData() != null) {
+            znode = event.getData().getPath();
+          }
+          log.makeAlert(e, "Failed to handle new worker status")
+             .addData("worker", zkWorker.getWorker().getHost())
+             .addData("znode", znode)
+             .addData("eventType", event.getType().toString())
+             .emit();
+        }
+      }
+    };
   }
 
   /**
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 0601648..b23e063 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IndexingServiceCondition;
@@ -44,6 +45,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.testing.DeadlockDetectingTimeout;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.joda.time.Period;
 import org.junit.After;
@@ -55,6 +57,7 @@ import org.junit.rules.TestRule;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -944,4 +947,37 @@ public class RemoteTaskRunnerTest
     Assert.assertTrue(taskFuture2.get().isSuccess());
     Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
   }
+
+  @Test
+  public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception
+  {
+    // Set up mock emitter to verify log alert when exception is thrown inside the status listener
+    Worker worker = EasyMock.createMock(Worker.class);
+    EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce();
+    EasyMock.replay(worker);
+    ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
+    Capture<EmittingLogger.EmittingAlertBuilder> capturedArgument = Capture.newInstance();
+    emitter.emit(EasyMock.capture(capturedArgument));
+    EasyMock.expectLastCall().atLeastOnce();
+    EmittingLogger.registerEmitter(emitter);
+    EasyMock.replay(emitter);
+
+    PathChildrenCache cache = new PathChildrenCache(cf, "/test", true);
+    testStartWithNoWorker();
+    cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, jsonMapper), null));
+    cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+
+    // Status listener will recieve event with null data
+    Assert.assertTrue(
+        TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1)
+    );
+
+    // Verify that the log emitter was called
+    EasyMock.verify(worker);
+    EasyMock.verify(emitter);
+    Map<String, Object> alertDataMap = capturedArgument.getValue().build(null).getDataMap();
+    Assert.assertTrue(alertDataMap.containsKey("znode"));
+    Assert.assertNull(alertDataMap.get("znode"));
+    // Status listener should successfully completes without throwing exception
+  }
 }
diff --git a/licenses.yaml b/licenses.yaml
index ad81dc4..bb51741 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -1315,7 +1315,7 @@ name: Apache Curator
 license_category: binary
 module: java-core
 license_name: Apache License version 2.0
-version: 4.1.0
+version: 4.3.0
 libraries:
   - org.apache.curator: curator-client
   - org.apache.curator: curator-framework
diff --git a/pom.xml b/pom.xml
index 0305e87..e26d58f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@
         <java.version>8</java.version>
         <project.build.resourceEncoding>UTF-8</project.build.resourceEncoding>
         <aether.version>0.9.0.M2</aether.version>
-        <apache.curator.version>4.1.0</apache.curator.version>
+        <apache.curator.version>4.3.0</apache.curator.version>
         <apache.curator.test.version>2.12.0</apache.curator.test.version>
         <apache.kafka.version>2.2.2</apache.kafka.version>
         <apache.ranger.version>2.0.0</apache.ranger.version>
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
index fd89964..78c5669 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
@@ -482,6 +482,18 @@ public class DiscoveryModule implements Module
     {
       return this;
     }
+
+    @Override
+    public ServiceProviderBuilder<T> executorService(ExecutorService executorService)
+    {
+      return this;
+    }
+
+    @Override
+    public ServiceProviderBuilder<T> executorService(CloseableExecutorService closeableExecutorService)
+    {
+      return this;
+    }
   }
 
   private static class NoopServiceProvider<T> implements ServiceProvider<T>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org