You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/02/15 00:54:23 UTC

[GitHub] [druid] ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379702969
 
 

 ##########
 File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 ##########
 @@ -151,127 +195,196 @@ protected void initializeIntermediaryDataManager() throws IOException
         ),
         null
     );
+    LocalShuffleClient shuffleClient = new LocalShuffleClient(intermediaryDataManager);
+    coordinatorClient = new LocalCoordinatorClient();
+    prepareObjectMapper(
+        objectMapper,
+        getIndexIO(),
+        indexingServiceClient,
+        indexTaskClientFactory,
+        shuffleClient,
+        coordinatorClient
+    );
   }
 
-  public class LocalIndexingServiceClient extends NoopIndexingServiceClient
+  @After
+  public void tearDownAbstractParallelIndexSupervisorTaskTest()
+  {
+    taskRunner.shutdown();
+    temporaryFolder.delete();
+  }
+
+  protected LocalIndexingServiceClient getIndexingServiceClient()
+  {
+    return indexingServiceClient;
+  }
+
+  protected IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> getParallelIndexTaskClientFactory()
+  {
+    return indexTaskClientFactory;
+  }
+
+  protected CoordinatorClient getCoordinatorClient()
   {
-    private final ConcurrentMap<String, Future<TaskStatus>> tasks = new ConcurrentHashMap<>();
+    return coordinatorClient;
+  }
+
+  private static class TaskContainer
+  {
+    private final Task task;
+    @MonotonicNonNull
+    private volatile Future<TaskStatus> statusFuture;
+    @MonotonicNonNull
+    private volatile TestLocalTaskActionClient actionClient;
+
+    private TaskContainer(Task task)
+    {
+      this.task = task;
+    }
+
+    private void setStatusFuture(Future<TaskStatus> statusFuture)
+    {
+      this.statusFuture = statusFuture;
+    }
+
+    private void setActionClient(TestLocalTaskActionClient actionClient)
+    {
+      this.actionClient = actionClient;
+    }
+  }
+
+  public class SimpleThreadingTaskRunner
+  {
+    private final ConcurrentMap<String, TaskContainer> tasks = new ConcurrentHashMap<>();
     private final ListeningExecutorService service = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(5, "parallel-index-supervisor-task-test-%d")
+        Execs.multiThreaded(5, "simple-threading-task-runner-%d")
     );
 
-    @Override
-    public String runTask(Object taskObject)
+    public String run(Task task)
+    {
+      runTask(task);
+      return task.getId();
+    }
+
+    private TaskStatus runAndWait(Task task)
     {
-      final Task subTask = (Task) taskObject;
       try {
-        getTaskStorage().insert(subTask, TaskStatus.running(subTask.getId()));
+        return runTask(task).get();
       }
-      catch (EntryExistsException e) {
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+      catch (ExecutionException e) {
         throw new RuntimeException(e);
       }
+    }
 
-      // WARNING: In production, subtasks are created via HTTP calls and instantiated by Jackson, which means they
-      // cannot share objects like they can here. For example, if the indexing task uses JsonParseSpec, the same
-      // JSONFlattenerMaker instance is shared among subtasks, which is bad since JSONFlattenerMaker is not thread-safe.
-      tasks.put(subTask.getId(), service.submit(() -> {
-        try {
-          final TaskToolbox toolbox = createTaskToolbox(subTask);
-          if (subTask.isReady(toolbox.getTaskActionClient())) {
-            return subTask.run(toolbox);
-          } else {
-            getTaskStorage().setStatus(TaskStatus.failure(subTask.getId()));
-            throw new ISE("task[%s] is not ready", subTask.getId());
-          }
-        }
-        catch (Exception e) {
-          getTaskStorage().setStatus(TaskStatus.failure(subTask.getId(), e.getMessage()));
-          throw new RuntimeException(e);
+    private TaskStatus waitToFinish(Task task)
+    {
+      final TaskContainer taskContainer = tasks.get(task.getId());
+      if (taskContainer == null) {
+        throw new IAE("Unknown task[%s]", task.getId());
+      }
+      try {
+        while (taskContainer.statusFuture == null && !Thread.currentThread().isInterrupted()) {
+          Thread.sleep(10);
         }
-      }));
-      return subTask.getId();
+        return taskContainer.statusFuture.get();
 
 Review comment:
   Previously, tests would be able to specify a timeout, which is useful for failing tests sooner than the travis timeout.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org