You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/11/01 15:30:48 UTC

[incubator-druid] branch 0.13.0-incubating updated: Fix NPE in TaskLockbox that prevents overlord leadership (#6512) (#6564)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.13.0-incubating by this push:
     new e96df6e  Fix NPE in TaskLockbox that prevents overlord leadership (#6512) (#6564)
e96df6e is described below

commit e96df6e727b0981190571b3a0677c733d95b15c4
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Thu Nov 1 08:30:38 2018 -0700

    Fix NPE in TaskLockbox that prevents overlord leadership (#6512) (#6564)
    
    * fix NPE that prevents overlord from assuming leadership if extension that provides indexing task type is not loaded
    
    * heh
---
 .../indexing/overlord/MetadataTaskStorage.java     |  4 +-
 .../druid/indexing/overlord/TaskLockboxTest.java   | 96 ++++++++++++++++++++++
 2 files changed, 99 insertions(+), 1 deletion(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
index 2c836bd..808fdb7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
@@ -190,9 +190,11 @@ public class MetadataTaskStorage implements TaskStorage
   @Override
   public List<Task> getActiveTasks()
   {
+    // filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module
+    // and don't know what to do with the payload, so we won't be able to make use of it anyway
     return handler.getActiveTaskInfo(null)
            .stream()
-           .filter(taskInfo -> taskInfo.getStatus().isRunnable())
+           .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null)
            .map(TaskInfo::getTask)
            .collect(Collectors.toList());
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index e20df5a..790e00f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -23,11 +23,16 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.collect.Iterables;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
+import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -343,6 +348,46 @@ public class TaskLockboxTest
   }
 
   @Test
+  public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() throws Exception
+  {
+    // ensure that if we don't know how to deserialize a task it won't explode the lockbox
+    // (or anything else that uses taskStorage.getActiveTasks() and doesn't expect null which is most things)
+    final TestDerbyConnector derbyConnector = derby.getConnector();
+    ObjectMapper loadedMapper = new DefaultObjectMapper().registerModule(new TheModule());
+    TaskStorage loadedTaskStorage = new MetadataTaskStorage(
+        derbyConnector,
+        new TaskStorageConfig(null),
+        new DerbyMetadataStorageActionHandlerFactory(
+            derbyConnector,
+            derby.metadataTablesConfigSupplier().get(),
+            loadedMapper
+        )
+    );
+
+    TaskLockbox theBox = new TaskLockbox(taskStorage);
+    TaskLockbox loadedBox = new TaskLockbox(loadedTaskStorage);
+
+    Task aTask = NoopTask.create();
+    taskStorage.insert(aTask, TaskStatus.running(aTask.getId()));
+    theBox.add(aTask);
+    loadedBox.add(aTask);
+
+    Task theTask = new MyModuleIsntLoadedTask("1", "yey", null, "foo");
+    loadedTaskStorage.insert(theTask, TaskStatus.running(theTask.getId()));
+    theBox.add(theTask);
+    loadedBox.add(theTask);
+
+    List<Task> tasks = taskStorage.getActiveTasks();
+    List<Task> tasksFromLoaded = loadedTaskStorage.getActiveTasks();
+
+    theBox.syncFromStorage();
+    loadedBox.syncFromStorage();
+
+    Assert.assertEquals(1, tasks.size());
+    Assert.assertEquals(2, tasksFromLoaded.size());
+  }
+
+  @Test
   public void testRevokedLockSyncFromStorage() throws EntryExistsException
   {
     final TaskLockbox originalBox = new TaskLockbox(taskStorage);
@@ -648,4 +693,55 @@ public class TaskLockboxTest
       return super.isRevoked();
     }
   }
+
+  private static String TASK_NAME = "myModuleIsntLoadedTask";
+  private static class TheModule extends SimpleModule
+  {
+    public TheModule()
+    {
+
+      registerSubtypes(new NamedType(MyModuleIsntLoadedTask.class, TASK_NAME));
+    }
+  }
+
+  private static class MyModuleIsntLoadedTask extends AbstractTask
+  {
+    private String someProp;
+
+    @JsonCreator
+    protected MyModuleIsntLoadedTask(
+        @JsonProperty("id") String id,
+        @JsonProperty("dataSource") String dataSource,
+        @JsonProperty("context") Map<String, Object> context,
+        @JsonProperty("someProp") String someProp
+    )
+    {
+      super(id, dataSource, context);
+      this.someProp = someProp;
+    }
+
+    @JsonProperty
+    public String getSomeProp()
+    {
+      return someProp;
+    }
+
+    @Override
+    public String getType()
+    {
+      return TASK_NAME;
+    }
+
+    @Override
+    public boolean isReady(TaskActionClient taskActionClient)
+    {
+      return true;
+    }
+
+    @Override
+    public TaskStatus run(TaskToolbox toolbox)
+    {
+      return TaskStatus.failure("how?");
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org