You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/07/23 00:41:29 UTC

[GitHub] [druid] loquisgon opened a new pull request #11486: Add error msg to parallel task's TaskStatus

loquisgon opened a new pull request #11486:
URL: https://github.com/apache/druid/pull/11486


   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   Fixes #XXXX.
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   #### Fixed the bug ...
   #### Renamed the class ...
   #### Added a forbidden-apis entry ...
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `MyFoo`
    * `OurBar`
    * `TheirBaz`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon closed pull request #11486: Add error msg to parallel task's TaskStatus

Posted by GitBox <gi...@apache.org>.
loquisgon closed pull request #11486:
URL: https://github.com/apache/druid/pull/11486


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #11486: Add error msg to parallel task's TaskStatus

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #11486:
URL: https://github.com/apache/druid/pull/11486#discussion_r680233569



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Force and verify the failure modes for hash partitioning task
+ */
+public class HashPartitionTaskKillTest extends AbstractMultiPhaseParallelIndexingTest
+{
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
+  private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+      DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
+  );
+  private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+      Arrays.asList("ts", "dim1", "dim2", "val"),
+      null,
+      false,
+      false,
+      0
+  );
+  private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
+
+  private File inputDir;
+
+
+  public HashPartitionTaskKillTest()
+  {
+    super(LockGranularity.TIME_CHUNK, true, 0, 0);
+  }
+
+  @Before
+  public void setup() throws IOException
+  {
+    inputDir = temporaryFolder.newFolder("data");
+    final Set<Interval> intervals = new HashSet<>();
+    // set up data
+    for (int i = 0; i < 10; i++) {
+      try (final Writer writer =
+               Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
+        for (int j = 0; j < 10; j++) {
+          writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 1, i + 10, i));
+          writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 2, i + 11, i));
+          intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 1))));
+          intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 2))));
+        }
+      }
+    }
+
+    for (int i = 0; i < 5; i++) {
+      try (final Writer writer =
+               Files.newBufferedWriter(new File(inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8)) {
+        writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", i + 1, i + 10, i));
+      }
+    }
+    // sorted input intervals
+    List<Interval> inputIntervals = new ArrayList<>(intervals);
+    inputIntervals.sort(Comparators.intervalsByStartThenEnd());
+  }
+
+  @Test(timeout = 5000L)
+  public void failsInFirstPhase() throws Exception
+  {
+    final ParallelIndexSupervisorTask task =
+        newTask(TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, null, INTERVAL_TO_INDEX, inputDir,
+                "test_*",
+                new HashedPartitionsSpec(null, null, // num shards is null to force it to go to first phase
+                                         ImmutableList.of("dim1", "dim2")
+                ),
+                2, false, true, 0
+        );
+
+    final TaskActionClient actionClient = createActionClient(task);
+    final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
+
+    prepareTaskForLocking(task);
+    Assert.assertTrue(task.isReady(actionClient));
+    task.stopGracefully(null);
+
+    TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);
+
+    Assert.assertTrue(taskStatus.isFailure());
+    Assert.assertEquals(
+        "Failed in phase[TestRunner[false]]. See task logs for details.",

Review comment:
       Phases are more evident now after my last commit

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
+import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Force and verify the failure modes for range partitioning task
+ */
+public class RangePartitionTaskKillTest extends AbstractMultiPhaseParallelIndexingTest
+{
+  private static final int NUM_PARTITION = 2;
+  private static final int NUM_ROW = 20;
+  private static final int DIM_FILE_CARDINALITY = 2;
+  private static final int YEAR = 2017;
+  private static final Interval INTERVAL_TO_INDEX = Intervals.of("%s-12/P1M", YEAR);
+  private static final String TIME = "ts";
+  private static final String DIM1 = "dim1";
+  private static final String DIM2 = "dim2";
+  private static final String LIST_DELIMITER = "|";
+  private static final String TEST_FILE_NAME_PREFIX = "test_";
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(TIME, "auto", null);
+  private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+      DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))
+  );
+
+  private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+      Arrays.asList(TIME, DIM1, DIM2, "val"),
+      LIST_DELIMITER,
+      false,
+      false,
+      0
+  );
+
+  private File inputDir;
+
+  public RangePartitionTaskKillTest()
+  {
+    super(LockGranularity.SEGMENT, true, DEFAULT_TRANSIENT_TASK_FAILURE_RATE, DEFAULT_TRANSIENT_API_FAILURE_RATE);
+  }
+
+  @Before
+  public void setup() throws IOException
+  {
+    inputDir = temporaryFolder.newFolder("data");
+  }
+
+  @Test(timeout = 5000L)
+  public void failsFirstPhase() throws Exception
+  {
+    int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION;
+    final ParallelIndexSupervisorTask task =
+        newTask(TIMESTAMP_SPEC,
+                DIMENSIONS_SPEC,
+                INPUT_FORMAT,
+                null,
+                INTERVAL_TO_INDEX,
+                inputDir,
+                TEST_FILE_NAME_PREFIX + "*",
+                new SingleDimensionPartitionsSpec(
+                    targetRowsPerSegment,
+                    null,
+                    DIM1,
+                    false
+                ),
+                2,
+                false,
+                0
+        );
+
+    final TaskActionClient actionClient = createActionClient(task);
+    final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
+
+    prepareTaskForLocking(task);
+    Assert.assertTrue(task.isReady(actionClient));
+    task.stopGracefully(null);
+
+
+    TaskStatus taskStatus = task.runRangePartitionMultiPhaseParallel(toolbox);
+
+    Assert.assertTrue(taskStatus.isFailure());
+    Assert.assertEquals(
+        "Failed in phase[TestRunner[false]]. See task logs for details.",

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #11486: Add error msg to parallel task's TaskStatus

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #11486:
URL: https://github.com/apache/druid/pull/11486#discussion_r680111320



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -555,14 +559,23 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
     );
 
     final TaskState state = runNextPhase(runner);
+    TaskStatus taskStatus;
     if (state.isSuccess()) {
       //noinspection ConstantConditions
       publishSegments(toolbox, runner.getReports());
       if (awaitSegmentAvailabilityTimeoutMillis > 0) {
         waitForSegmentAvailability(runner.getReports());
       }
+      taskStatus = TaskStatus.success(getId());
+    } else {
+      // there is only success or failure after running....
+      Preconditions.checkState(state.isFailure());

Review comment:
       Today there is no state other than failure after a task ran and was not successful. I added a message  to the exception for the case when that state is added so we know that there is some code that needs to be changed as well as throwing an exception to let the person change the code in an appropriate manner for the new state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11486: Add error msg to parallel task's TaskStatus

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11486:
URL: https://github.com/apache/druid/pull/11486#issuecomment-891177517


   I restarted failed ones as they seem flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #11486: Add error msg to parallel task's TaskStatus

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #11486:
URL: https://github.com/apache/druid/pull/11486#discussion_r680111320



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -555,14 +559,23 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
     );
 
     final TaskState state = runNextPhase(runner);
+    TaskStatus taskStatus;
     if (state.isSuccess()) {
       //noinspection ConstantConditions
       publishSegments(toolbox, runner.getReports());
       if (awaitSegmentAvailabilityTimeoutMillis > 0) {
         waitForSegmentAvailability(runner.getReports());
       }
+      taskStatus = TaskStatus.success(getId());
+    } else {
+      // there is only success or failure after running....
+      Preconditions.checkState(state.isFailure());

Review comment:
       Today there is no state other than failure after a task ran and was not successful. I added a log message for the case when that state is added so we know that there is some code that needs to be changed as well as throwing an exception to let the person change the code in an appropriate manner for the new state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #11486: Add error msg to parallel task's TaskStatus

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #11486:
URL: https://github.com/apache/druid/pull/11486


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #11486: Add error msg to parallel task's TaskStatus

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #11486:
URL: https://github.com/apache/druid/pull/11486#issuecomment-885346333


   This pull request **introduces 1 alert** when merging 3461ad2eb64cf58d07c700f5b5a8055ea406375b into ce1faa56352766522610e3ae306c15b55df19fb0 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-6d6b5760bf47a1d732d50a0bd82f6d930c1dd06c)
   
   **new alerts:**
   
   * 1 for Unused format argument


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on pull request #11486: Add error msg to parallel task's TaskStatus

Posted by GitBox <gi...@apache.org>.
loquisgon commented on pull request #11486:
URL: https://github.com/apache/druid/pull/11486#issuecomment-888700324


   Note that we decided not to put information about successful/failed/complete/etc tasks in the TaskStatus since the errMsg gets truncated. We will put that info in the TaskReport in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11486: Add error msg to parallel task's TaskStatus

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11486:
URL: https://github.com/apache/druid/pull/11486#discussion_r679569432



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
+import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Force and verify the failure modes for range partitioning task
+ */
+public class RangePartitionTaskKillTest extends AbstractMultiPhaseParallelIndexingTest
+{
+  private static final int NUM_PARTITION = 2;
+  private static final int NUM_ROW = 20;
+  private static final int DIM_FILE_CARDINALITY = 2;
+  private static final int YEAR = 2017;
+  private static final Interval INTERVAL_TO_INDEX = Intervals.of("%s-12/P1M", YEAR);
+  private static final String TIME = "ts";
+  private static final String DIM1 = "dim1";
+  private static final String DIM2 = "dim2";
+  private static final String LIST_DELIMITER = "|";
+  private static final String TEST_FILE_NAME_PREFIX = "test_";
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(TIME, "auto", null);
+  private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+      DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))
+  );
+
+  private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+      Arrays.asList(TIME, DIM1, DIM2, "val"),
+      LIST_DELIMITER,
+      false,
+      false,
+      0
+  );
+
+  private File inputDir;
+
+  public RangePartitionTaskKillTest()
+  {
+    super(LockGranularity.SEGMENT, true, DEFAULT_TRANSIENT_TASK_FAILURE_RATE, DEFAULT_TRANSIENT_API_FAILURE_RATE);
+  }
+
+  @Before
+  public void setup() throws IOException
+  {
+    inputDir = temporaryFolder.newFolder("data");
+  }
+
+  @Test(timeout = 5000L)
+  public void failsFirstPhase() throws Exception
+  {
+    int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION;
+    final ParallelIndexSupervisorTask task =
+        newTask(TIMESTAMP_SPEC,
+                DIMENSIONS_SPEC,
+                INPUT_FORMAT,
+                null,
+                INTERVAL_TO_INDEX,
+                inputDir,
+                TEST_FILE_NAME_PREFIX + "*",
+                new SingleDimensionPartitionsSpec(
+                    targetRowsPerSegment,
+                    null,
+                    DIM1,
+                    false
+                ),
+                2,
+                false,
+                0
+        );
+
+    final TaskActionClient actionClient = createActionClient(task);
+    final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
+
+    prepareTaskForLocking(task);
+    Assert.assertTrue(task.isReady(actionClient));
+    task.stopGracefully(null);
+
+
+    TaskStatus taskStatus = task.runRangePartitionMultiPhaseParallel(toolbox);
+
+    Assert.assertTrue(taskStatus.isFailure());
+    Assert.assertEquals(
+        "Failed in phase[TestRunner[false]]. See task logs for details.",

Review comment:
       Same comment for the phase name.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -555,14 +559,23 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
     );
 
     final TaskState state = runNextPhase(runner);
+    TaskStatus taskStatus;
     if (state.isSuccess()) {
       //noinspection ConstantConditions
       publishSegments(toolbox, runner.getReports());
       if (awaitSegmentAvailabilityTimeoutMillis > 0) {
         waitForSegmentAvailability(runner.getReports());
       }
+      taskStatus = TaskStatus.success(getId());
+    } else {
+      // there is only success or failure after running....
+      Preconditions.checkState(state.isFailure());

Review comment:
       Is this a future proof for any new states that can be transitioned from running besides success and failure? Could it be nice to include some messages about what the state was?

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Force and verify the failure modes for hash partitioning task
+ */
+public class HashPartitionTaskKillTest extends AbstractMultiPhaseParallelIndexingTest
+{
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
+  private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+      DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
+  );
+  private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+      Arrays.asList("ts", "dim1", "dim2", "val"),
+      null,
+      false,
+      false,
+      0
+  );
+  private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
+
+  private File inputDir;
+
+
+  public HashPartitionTaskKillTest()
+  {
+    super(LockGranularity.TIME_CHUNK, true, 0, 0);
+  }
+
+  @Before
+  public void setup() throws IOException
+  {
+    inputDir = temporaryFolder.newFolder("data");
+    final Set<Interval> intervals = new HashSet<>();
+    // set up data
+    for (int i = 0; i < 10; i++) {
+      try (final Writer writer =
+               Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
+        for (int j = 0; j < 10; j++) {
+          writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 1, i + 10, i));
+          writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 2, i + 11, i));
+          intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 1))));
+          intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 2))));
+        }
+      }
+    }
+
+    for (int i = 0; i < 5; i++) {
+      try (final Writer writer =
+               Files.newBufferedWriter(new File(inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8)) {
+        writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", i + 1, i + 10, i));
+      }
+    }
+    // sorted input intervals
+    List<Interval> inputIntervals = new ArrayList<>(intervals);
+    inputIntervals.sort(Comparators.intervalsByStartThenEnd());
+  }
+
+  @Test(timeout = 5000L)
+  public void failsInFirstPhase() throws Exception
+  {
+    final ParallelIndexSupervisorTask task =
+        newTask(TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, null, INTERVAL_TO_INDEX, inputDir,
+                "test_*",
+                new HashedPartitionsSpec(null, null, // num shards is null to force it to go to first phase
+                                         ImmutableList.of("dim1", "dim2")
+                ),
+                2, false, true, 0
+        );
+
+    final TaskActionClient actionClient = createActionClient(task);
+    final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
+
+    prepareTaskForLocking(task);
+    Assert.assertTrue(task.isReady(actionClient));
+    task.stopGracefully(null);
+
+    TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);
+
+    Assert.assertTrue(taskStatus.isFailure());
+    Assert.assertEquals(
+        "Failed in phase[TestRunner[false]]. See task logs for details.",

Review comment:
       I'm unsure what the failed phase was because its name is always same. I suggest to name each phase differently, so that these tests can verify the exact message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org