You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/02/11 20:03:21 UTC

[GitHub] [druid] jihoonson opened a new pull request #9353: Inject things instead of subclassing everything for parallel task testing

jihoonson opened a new pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353
 
 
   ## Description
   
   We are using a testing framework for parallel task that simulates the parallel indexing with an executor service. This framework requires to implement sub classes for each particular testing which is annoying. This PR is to fix this issue by injecting necessary tools automatically. You can still turn off the injection by adding `disableInject` in the task context.
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [x] added unit tests or modified existing tests to cover new code paths.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379882978
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java
 ##########
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.EscalatedClient;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.ExecutionException;
+
+public class HttpShuffleClient implements ShuffleClient
+{
+  private static final int BUFFER_SIZE = 1024 * 4;
+  private static final int NUM_FETCH_RETRIES = 3;
+
+  private final HttpClient httpClient;
+
+  @Inject
+  public HttpShuffleClient(@EscalatedClient HttpClient httpClient)
+  {
+    this.httpClient = httpClient;
+  }
+
+  @Override
+  public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
+      File partitionDir,
+      String supervisorTaskId,
+      P location
+  ) throws IOException
+  {
+    final byte[] buffer = new byte[BUFFER_SIZE];
 
 Review comment:
   This method can be called by multiple threads at the same time with the Indexer. Added comments.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379695817
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java
 ##########
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.EscalatedClient;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.ExecutionException;
+
+public class HttpShuffleClient implements ShuffleClient
+{
+  private static final int BUFFER_SIZE = 1024 * 4;
+  private static final int NUM_FETCH_RETRIES = 3;
+
+  private final HttpClient httpClient;
+
+  @Inject
+  public HttpShuffleClient(@EscalatedClient HttpClient httpClient)
+  {
+    this.httpClient = httpClient;
+  }
+
+  @Override
+  public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
+      File partitionDir,
+      String supervisorTaskId,
+      P location
+  ) throws IOException
+  {
+    final byte[] buffer = new byte[BUFFER_SIZE];
 
 Review comment:
   This was previously a class member variable, so it was only allocated once. Is there a reason that's changed here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379883008
 
 

 ##########
 File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 ##########
 @@ -197,62 +182,52 @@ private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity s
   }
 
   @Test
-  public void testWithoutInterval() throws Exception
+  public void testWithoutInterval()
   {
     testRunAndOverwrite(null, Granularities.DAY);
   }
 
   @Test()
-  public void testRunInParallel() throws Exception
+  public void testRunInParallel()
   {
     // Ingest all data.
-    testRunAndOverwrite(Intervals.of("2017/2018"), Granularities.DAY);
+    testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
 
 Review comment:
   Hmm, since the test data is generated in each class, I think it's better to have the constant separately in each class as well. Added constants.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379702277
 
 

 ##########
 File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 ##########
 @@ -35,53 +43,80 @@
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
+import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.IngestionTestBase;
 import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
+import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.indexing.worker.IntermediaryDataManager;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
+import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.join.NoopJoinableFactory;
+import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.NoopDataSegmentKiller;
 import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.security.AllowAllAuthorizer;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
 {
+  static final String DISABLE_INJECT_CONTEXT_KEY = "disableInject";
 
 Review comment:
   Suggestion: Rename to something like `DISABLE_TASK_INJECT_CONTEXT_KEY`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379882987
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java
 ##########
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.EscalatedClient;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.ExecutionException;
+
+public class HttpShuffleClient implements ShuffleClient
+{
+  private static final int BUFFER_SIZE = 1024 * 4;
+  private static final int NUM_FETCH_RETRIES = 3;
+
+  private final HttpClient httpClient;
+
+  @Inject
+  public HttpShuffleClient(@EscalatedClient HttpClient httpClient)
+  {
+    this.httpClient = httpClient;
+  }
+
+  @Override
+  public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
 
 Review comment:
   Added some.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379702969
 
 

 ##########
 File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 ##########
 @@ -151,127 +195,196 @@ protected void initializeIntermediaryDataManager() throws IOException
         ),
         null
     );
+    LocalShuffleClient shuffleClient = new LocalShuffleClient(intermediaryDataManager);
+    coordinatorClient = new LocalCoordinatorClient();
+    prepareObjectMapper(
+        objectMapper,
+        getIndexIO(),
+        indexingServiceClient,
+        indexTaskClientFactory,
+        shuffleClient,
+        coordinatorClient
+    );
   }
 
-  public class LocalIndexingServiceClient extends NoopIndexingServiceClient
+  @After
+  public void tearDownAbstractParallelIndexSupervisorTaskTest()
+  {
+    taskRunner.shutdown();
+    temporaryFolder.delete();
+  }
+
+  protected LocalIndexingServiceClient getIndexingServiceClient()
+  {
+    return indexingServiceClient;
+  }
+
+  protected IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> getParallelIndexTaskClientFactory()
+  {
+    return indexTaskClientFactory;
+  }
+
+  protected CoordinatorClient getCoordinatorClient()
   {
-    private final ConcurrentMap<String, Future<TaskStatus>> tasks = new ConcurrentHashMap<>();
+    return coordinatorClient;
+  }
+
+  private static class TaskContainer
+  {
+    private final Task task;
+    @MonotonicNonNull
+    private volatile Future<TaskStatus> statusFuture;
+    @MonotonicNonNull
+    private volatile TestLocalTaskActionClient actionClient;
+
+    private TaskContainer(Task task)
+    {
+      this.task = task;
+    }
+
+    private void setStatusFuture(Future<TaskStatus> statusFuture)
+    {
+      this.statusFuture = statusFuture;
+    }
+
+    private void setActionClient(TestLocalTaskActionClient actionClient)
+    {
+      this.actionClient = actionClient;
+    }
+  }
+
+  public class SimpleThreadingTaskRunner
+  {
+    private final ConcurrentMap<String, TaskContainer> tasks = new ConcurrentHashMap<>();
     private final ListeningExecutorService service = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(5, "parallel-index-supervisor-task-test-%d")
+        Execs.multiThreaded(5, "simple-threading-task-runner-%d")
     );
 
-    @Override
-    public String runTask(Object taskObject)
+    public String run(Task task)
+    {
+      runTask(task);
+      return task.getId();
+    }
+
+    private TaskStatus runAndWait(Task task)
     {
-      final Task subTask = (Task) taskObject;
       try {
-        getTaskStorage().insert(subTask, TaskStatus.running(subTask.getId()));
+        return runTask(task).get();
       }
-      catch (EntryExistsException e) {
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+      catch (ExecutionException e) {
         throw new RuntimeException(e);
       }
+    }
 
-      // WARNING: In production, subtasks are created via HTTP calls and instantiated by Jackson, which means they
-      // cannot share objects like they can here. For example, if the indexing task uses JsonParseSpec, the same
-      // JSONFlattenerMaker instance is shared among subtasks, which is bad since JSONFlattenerMaker is not thread-safe.
-      tasks.put(subTask.getId(), service.submit(() -> {
-        try {
-          final TaskToolbox toolbox = createTaskToolbox(subTask);
-          if (subTask.isReady(toolbox.getTaskActionClient())) {
-            return subTask.run(toolbox);
-          } else {
-            getTaskStorage().setStatus(TaskStatus.failure(subTask.getId()));
-            throw new ISE("task[%s] is not ready", subTask.getId());
-          }
-        }
-        catch (Exception e) {
-          getTaskStorage().setStatus(TaskStatus.failure(subTask.getId(), e.getMessage()));
-          throw new RuntimeException(e);
+    private TaskStatus waitToFinish(Task task)
+    {
+      final TaskContainer taskContainer = tasks.get(task.getId());
+      if (taskContainer == null) {
+        throw new IAE("Unknown task[%s]", task.getId());
+      }
+      try {
+        while (taskContainer.statusFuture == null && !Thread.currentThread().isInterrupted()) {
+          Thread.sleep(10);
         }
-      }));
-      return subTask.getId();
+        return taskContainer.statusFuture.get();
 
 Review comment:
   Previously, tests would be able to specify a timeout, which is useful for failing tests sooner than the travis timeout.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379695930
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java
 ##########
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.EscalatedClient;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.ExecutionException;
+
+public class HttpShuffleClient implements ShuffleClient
+{
+  private static final int BUFFER_SIZE = 1024 * 4;
+  private static final int NUM_FETCH_RETRIES = 3;
+
+  private final HttpClient httpClient;
+
+  @Inject
+  public HttpShuffleClient(@EscalatedClient HttpClient httpClient)
+  {
+    this.httpClient = httpClient;
+  }
+
+  @Override
+  public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
 
 Review comment:
   Is it worth adding unit tests for this method?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379882989
 
 

 ##########
 File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 ##########
 @@ -35,53 +43,80 @@
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
+import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.IngestionTestBase;
 import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
+import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.indexing.worker.IntermediaryDataManager;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
+import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.join.NoopJoinableFactory;
+import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.NoopDataSegmentKiller;
 import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.security.AllowAllAuthorizer;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
 {
+  static final String DISABLE_INJECT_CONTEXT_KEY = "disableInject";
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379883000
 
 

 ##########
 File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 ##########
 @@ -151,127 +195,196 @@ protected void initializeIntermediaryDataManager() throws IOException
         ),
         null
     );
+    LocalShuffleClient shuffleClient = new LocalShuffleClient(intermediaryDataManager);
+    coordinatorClient = new LocalCoordinatorClient();
+    prepareObjectMapper(
+        objectMapper,
+        getIndexIO(),
+        indexingServiceClient,
+        indexTaskClientFactory,
+        shuffleClient,
+        coordinatorClient
+    );
   }
 
-  public class LocalIndexingServiceClient extends NoopIndexingServiceClient
+  @After
+  public void tearDownAbstractParallelIndexSupervisorTaskTest()
+  {
+    taskRunner.shutdown();
+    temporaryFolder.delete();
+  }
+
+  protected LocalIndexingServiceClient getIndexingServiceClient()
+  {
+    return indexingServiceClient;
+  }
+
+  protected IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> getParallelIndexTaskClientFactory()
+  {
+    return indexTaskClientFactory;
+  }
+
+  protected CoordinatorClient getCoordinatorClient()
   {
-    private final ConcurrentMap<String, Future<TaskStatus>> tasks = new ConcurrentHashMap<>();
+    return coordinatorClient;
+  }
+
+  private static class TaskContainer
+  {
+    private final Task task;
+    @MonotonicNonNull
+    private volatile Future<TaskStatus> statusFuture;
+    @MonotonicNonNull
+    private volatile TestLocalTaskActionClient actionClient;
+
+    private TaskContainer(Task task)
+    {
+      this.task = task;
+    }
+
+    private void setStatusFuture(Future<TaskStatus> statusFuture)
+    {
+      this.statusFuture = statusFuture;
+    }
+
+    private void setActionClient(TestLocalTaskActionClient actionClient)
+    {
+      this.actionClient = actionClient;
+    }
+  }
+
+  public class SimpleThreadingTaskRunner
+  {
+    private final ConcurrentMap<String, TaskContainer> tasks = new ConcurrentHashMap<>();
     private final ListeningExecutorService service = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(5, "parallel-index-supervisor-task-test-%d")
+        Execs.multiThreaded(5, "simple-threading-task-runner-%d")
     );
 
-    @Override
-    public String runTask(Object taskObject)
+    public String run(Task task)
+    {
+      runTask(task);
+      return task.getId();
+    }
+
+    private TaskStatus runAndWait(Task task)
     {
-      final Task subTask = (Task) taskObject;
       try {
-        getTaskStorage().insert(subTask, TaskStatus.running(subTask.getId()));
+        return runTask(task).get();
       }
-      catch (EntryExistsException e) {
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+      catch (ExecutionException e) {
         throw new RuntimeException(e);
       }
+    }
 
-      // WARNING: In production, subtasks are created via HTTP calls and instantiated by Jackson, which means they
-      // cannot share objects like they can here. For example, if the indexing task uses JsonParseSpec, the same
-      // JSONFlattenerMaker instance is shared among subtasks, which is bad since JSONFlattenerMaker is not thread-safe.
-      tasks.put(subTask.getId(), service.submit(() -> {
-        try {
-          final TaskToolbox toolbox = createTaskToolbox(subTask);
-          if (subTask.isReady(toolbox.getTaskActionClient())) {
-            return subTask.run(toolbox);
-          } else {
-            getTaskStorage().setStatus(TaskStatus.failure(subTask.getId()));
-            throw new ISE("task[%s] is not ready", subTask.getId());
-          }
-        }
-        catch (Exception e) {
-          getTaskStorage().setStatus(TaskStatus.failure(subTask.getId(), e.getMessage()));
-          throw new RuntimeException(e);
+    private TaskStatus waitToFinish(Task task)
+    {
+      final TaskContainer taskContainer = tasks.get(task.getId());
+      if (taskContainer == null) {
+        throw new IAE("Unknown task[%s]", task.getId());
+      }
+      try {
+        while (taskContainer.statusFuture == null && !Thread.currentThread().isInterrupted()) {
+          Thread.sleep(10);
         }
-      }));
-      return subTask.getId();
+        return taskContainer.statusFuture.get();
 
 Review comment:
   Added the timeout back.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #9353: Inject things instead of subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379703293
 
 

 ##########
 File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 ##########
 @@ -197,62 +182,52 @@ private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity s
   }
 
   @Test
-  public void testWithoutInterval() throws Exception
+  public void testWithoutInterval()
   {
     testRunAndOverwrite(null, Granularities.DAY);
   }
 
   @Test()
-  public void testRunInParallel() throws Exception
+  public void testRunInParallel()
   {
     // Ingest all data.
-    testRunAndOverwrite(Intervals.of("2017/2018"), Granularities.DAY);
+    testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
 
 Review comment:
   If the interval had been a named constant for the class, then updating it for all of the tests would have been easier. Perhaps it's worth creating the named constant now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org