You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/11/01 15:30:48 UTC
[incubator-druid] branch 0.13.0-incubating updated: Fix NPE in
TaskLockbox that prevents overlord leadership (#6512) (#6564)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.13.0-incubating by this push:
new e96df6e Fix NPE in TaskLockbox that prevents overlord leadership (#6512) (#6564)
e96df6e is described below
commit e96df6e727b0981190571b3a0677c733d95b15c4
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Thu Nov 1 08:30:38 2018 -0700
Fix NPE in TaskLockbox that prevents overlord leadership (#6512) (#6564)
* fix NPE that prevents overlord from assuming leadership if extension that provides indexing task type is not loaded
* heh
---
.../indexing/overlord/MetadataTaskStorage.java | 4 +-
.../druid/indexing/overlord/TaskLockboxTest.java | 96 ++++++++++++++++++++++
2 files changed, 99 insertions(+), 1 deletion(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
index 2c836bd..808fdb7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
@@ -190,9 +190,11 @@ public class MetadataTaskStorage implements TaskStorage
@Override
public List<Task> getActiveTasks()
{
+ // filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module
+ // and don't know what to do with the payload, so we won't be able to make use of it anyway
return handler.getActiveTaskInfo(null)
.stream()
- .filter(taskInfo -> taskInfo.getStatus().isRunnable())
+ .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null)
.map(TaskInfo::getTask)
.collect(Collectors.toList());
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index e20df5a..790e00f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -23,11 +23,16 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
+import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -343,6 +348,46 @@ public class TaskLockboxTest
}
@Test
+ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() throws Exception
+ {
+ // ensure that if we don't know how to deserialize a task it won't explode the lockbox
+ // (or anything else that uses taskStorage.getActiveTasks() and doesn't expect null which is most things)
+ final TestDerbyConnector derbyConnector = derby.getConnector();
+ ObjectMapper loadedMapper = new DefaultObjectMapper().registerModule(new TheModule());
+ TaskStorage loadedTaskStorage = new MetadataTaskStorage(
+ derbyConnector,
+ new TaskStorageConfig(null),
+ new DerbyMetadataStorageActionHandlerFactory(
+ derbyConnector,
+ derby.metadataTablesConfigSupplier().get(),
+ loadedMapper
+ )
+ );
+
+ TaskLockbox theBox = new TaskLockbox(taskStorage);
+ TaskLockbox loadedBox = new TaskLockbox(loadedTaskStorage);
+
+ Task aTask = NoopTask.create();
+ taskStorage.insert(aTask, TaskStatus.running(aTask.getId()));
+ theBox.add(aTask);
+ loadedBox.add(aTask);
+
+ Task theTask = new MyModuleIsntLoadedTask("1", "yey", null, "foo");
+ loadedTaskStorage.insert(theTask, TaskStatus.running(theTask.getId()));
+ theBox.add(theTask);
+ loadedBox.add(theTask);
+
+ List<Task> tasks = taskStorage.getActiveTasks();
+ List<Task> tasksFromLoaded = loadedTaskStorage.getActiveTasks();
+
+ theBox.syncFromStorage();
+ loadedBox.syncFromStorage();
+
+ Assert.assertEquals(1, tasks.size());
+ Assert.assertEquals(2, tasksFromLoaded.size());
+ }
+
+ @Test
public void testRevokedLockSyncFromStorage() throws EntryExistsException
{
final TaskLockbox originalBox = new TaskLockbox(taskStorage);
@@ -648,4 +693,55 @@ public class TaskLockboxTest
return super.isRevoked();
}
}
+
+ private static String TASK_NAME = "myModuleIsntLoadedTask";
+ private static class TheModule extends SimpleModule
+ {
+ public TheModule()
+ {
+
+ registerSubtypes(new NamedType(MyModuleIsntLoadedTask.class, TASK_NAME));
+ }
+ }
+
+ private static class MyModuleIsntLoadedTask extends AbstractTask
+ {
+ private String someProp;
+
+ @JsonCreator
+ protected MyModuleIsntLoadedTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("context") Map<String, Object> context,
+ @JsonProperty("someProp") String someProp
+ )
+ {
+ super(id, dataSource, context);
+ this.someProp = someProp;
+ }
+
+ @JsonProperty
+ public String getSomeProp()
+ {
+ return someProp;
+ }
+
+ @Override
+ public String getType()
+ {
+ return TASK_NAME;
+ }
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient)
+ {
+ return true;
+ }
+
+ @Override
+ public TaskStatus run(TaskToolbox toolbox)
+ {
+ return TaskStatus.failure("how?");
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org