You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/09/09 07:01:00 UTC

[GitHub] [druid] LakshSingla opened a new pull request, #13062: Use worker number instead of task id in MSQ for communication to/from workers.

LakshSingla opened a new pull request, #13062:
URL: https://github.com/apache/druid/pull/13062

   ### Description
   
   In the current version of MSQ, workers are bound to a specific task. This is not cool for a fault tolerant engine where worker tasks can be ephemeral, and not bound to a specific task id, and can be respawned.in case the task they are bound to fails.
   This PR introduces a set of changes and reactors which makes the following possible:
   1. WorkerClient uses workerNumber instead of taskId to communicate between the workerTasks, abstracting away the complexity of resolving the workerNumber to the taskId from the callers. 
   2. `WorkerImpl` doesn't memoize the task location for the worker tasks and instead fetches it from the `ControllerClient` every time it wants to contact the worker.
   
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `MyFoo`
    * `OurBar`
    * `TheirBaz`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r974231888


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +118,59 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  /**
+   * Given an input worker number, stage number and the partition number, this method figures out the exact location
+   * where the outputs would be present in the durable storage and returns the complete path or throws an exception
+   * if no such file exists in the durable storage
+   * More information at {@link DurableStorageOutputChannelFactory#createSuccessFile(String)}
+   */
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    String successfulFilePath = DurableStorageUtils.getSuccessFilePath(
+        controllerTaskId,
+        stageNumber,
+        workerNo
+    );
+
+    if (!storageConnector.pathExists(successfulFilePath)) {
+      throw new ISE(
+          "No file present at the location [%s]. Unable to read the inputs for worker: [%d], stage: [%d], partition: [%d]",
+          successfulFilePath,
+          workerNo,
+          stageNumber,
+          partitionNumber
+      );
+    }
+
+    String successfulTaskId;
+
+    try (InputStream is = storageConnector.read(successfulFilePath)) {
+      // TODO: Check if this call can block indefinitely
+      successfulTaskId = IOUtils.toString(is, StandardCharsets.UTF_8);

Review Comment:
   Need a review on this, and if it can potentially block the output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r1005249811


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -888,7 +895,33 @@ public void onSuccess(final List<Object> workerResultAndOutputChannelsResolved)
             for (OutputChannel channel : outputChannels.getAllChannels()) {
               stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new ConcurrentHashMap<>())
                           .computeIfAbsent(channel.getPartitionNumber(), ignored2 -> channel.getReadableChannel());
+
+            }
+
+            if (durableStageStorageEnabled) {
+              // Once the outputs channels have been resolved and are ready for reading, the worker appends the filename
+              // with a special marker flag and adds it to the
+              DurableStorageOutputChannelFactory durableStorageOutputChannelFactory =
+                  DurableStorageOutputChannelFactory.createStandardImplementation(
+                      task.getControllerTaskId(),
+                      task().getWorkerNumber(),
+                      stageDef.getStageNumber(),
+                      task().getId(),
+                      frameContext.memoryParameters().getStandardFrameSize(),
+                      MSQTasks.makeStorageConnector(context.injector())
+                  );
+              try {
+                durableStorageOutputChannelFactory.createSuccessFile(task.getId());
+              }
+              catch (IOException e) {
+                throw new ISE(
+                    e,
+                    "Unable to suffix the file with %s",

Review Comment:
   Updated the messaging. Attaching the path location instead of adding id and stage individually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r973916496


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +119,33 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  @Nullable
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    List<String> fileNames = storageConnector.lsFiles(
+        DurableStorageOutputChannelFactory.getPartitionOutputsFolderName(
+            controllerTaskId,
+            workerNo,
+            stageNumber,
+            partitionNumber
+        )
+    );
+    Optional<String> maybeFileName = fileNames.stream()

Review Comment:
   Consider the scenario where:
   1. File with task id which starts with B is written at t0
   2. The same partition with task id which starts with A is written at t5
   
   The alphabetically sorted order will give different results at t4 and t6 which is what we would want to avoid. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r972809066


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java:
##########
@@ -112,31 +132,52 @@ public OutputChannel openNilChannel(int partitionNumber)
       throw new ISE(
           e,
           "Unable to create empty remote output of workerTask[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          workerNumber,
           stageNumber,
           partitionNumber
       );
     }
   }
 
+  public void markAsSuccess(int partitionNumber) throws IOException
+  {
+    String oldPath = getPartitionOutputFileNameForTask(controllerTaskId, workerNumber, stageNumber, partitionNumber, taskId);
+    storageConnector.moveFile(oldPath, StringUtils.format("%s%s", oldPath, SUCCESSFUL_SUFFIX));

Review Comment:
   I donot think there is support from AWS regarding this. So its more of a backend thing rather than a client thing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r985391069


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +118,59 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  /**
+   * Given an input worker number, stage number and the partition number, this method figures out the exact location
+   * where the outputs would be present in the durable storage and returns the complete path or throws an exception
+   * if no such file exists in the durable storage
+   * More information at {@link DurableStorageOutputChannelFactory#createSuccessFile(String)}
+   */
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    String successfulFilePath = DurableStorageUtils.getSuccessFilePath(
+        controllerTaskId,
+        stageNumber,
+        workerNo
+    );
+
+    if (!storageConnector.pathExists(successfulFilePath)) {
+      throw new ISE(
+          "No file present at the location [%s]. Unable to read the inputs for worker: [%d], stage: [%d], partition: [%d]",
+          successfulFilePath,
+          workerNo,
+          stageNumber,
+          partitionNumber
+      );
+    }
+
+    String successfulTaskId;
+
+    try (InputStream is = storageConnector.read(successfulFilePath)) {
+      // TODO: Check if this call can block indefinitely
+      successfulTaskId = IOUtils.toString(is, StandardCharsets.UTF_8);

Review Comment:
   I think this should be fine. As the success file would be less than 100 bytes, we should be cool with it. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java:
##########
@@ -72,7 +72,7 @@ ListenableFuture<Void> postResultPartitionBoundaries(
    * kind of unrecoverable exception).
    */
   ListenableFuture<Boolean> fetchChannelData(
-      String workerTaskId,
+      int workerNumber,

Review Comment:
   IMO, the logic of which worker task to contact should be built outside this client as we would never want half the requests going to older workers, when worker fault tolerance is there, and half of them to the new workers. 
   Hence, I think we should not touch `extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java` this class. 
   
   Thoughts?
   



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java:
##########
@@ -1086,6 +1091,65 @@ public void testGroupByMultiValueMeasureQuery()
         .verifyResults();
   }
 
+  @Test
+  public void testGroupByOnFooWithDurableStoragePathAssertions()
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("cnt", ColumnType.LONG)
+                                            .add("cnt1", ColumnType.LONG)
+                                            .build();
+
+    ExecutorService executorService = Execs.singleThreaded("path-verifier");
+    final AtomicBoolean existsOnce = new AtomicBoolean(false);
+    executorService.submit(() -> {
+      while (true) {
+        File successFile = new File(

Review Comment:
   This logic is very racy. Can we intercept calls on the local file storage connector and then assert on the paths ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on PR #13062:
URL: https://github.com/apache/druid/pull/13062#issuecomment-1250817660

   I have updated the code in the `WorkerImpl` to do the following (when the durable storage is enabled):
   Once the data for all the partitions have been generated it does the following things:
   1. Write the data into the path "controller_task_id/stage_a/worker_b/taskId_c/part_d". (Note: there is a slight change from the existing path structure).
   2. Checks the folder "controller_task_id/stage_a/worker_b" for a file named "__success". If present, then it does nothing, else it will write it's task Id in that file.
   
   While reading the data for a particular stage, worker number and partition, the worker then does the following:
   1. Checks the folder "controller_task_id/stage_a/worker_b" for a file named "__success" and attempts to read the task id of the task which successfully wrote to it.
   2. If unable to read it or the file is not present, the worker throws an error.
   3. Else it will fetch the task id present there and then read the data from the location: "controller_task_id/stage_a/worker_b/taskId_c/part_d" where the task id was fetched as above. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r973871394


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +119,33 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  @Nullable
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    List<String> fileNames = storageConnector.lsFiles(
+        DurableStorageOutputChannelFactory.getPartitionOutputsFolderName(
+            controllerTaskId,
+            workerNo,
+            stageNumber,
+            partitionNumber
+        )
+    );
+    Optional<String> maybeFileName = fileNames.stream()

Review Comment:
   Thanks! As per your original suggestions, I like the time-based approach. Another one that I can think of is picking up the first file in alphabetically sorted order. 
   If we are using one of the above methods, then I see no inherent benefits in using the controller to figure out the file in case of a tiebreaker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r995354141


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java:
##########
@@ -72,7 +72,7 @@ ListenableFuture<Void> postResultPartitionBoundaries(
    * kind of unrecoverable exception).
    */
   ListenableFuture<Boolean> fetchChannelData(
-      String workerTaskId,
+      int workerNumber,

Review Comment:
   The worker-client needs to be a simple client. Given a taskId, do the required operation. 
   I think that would be a more manageable contract for the users of the client as it is deterministic. 
   When the msqWokerTaskLauncher changes something in the background, we do not want an operation to succeed on a worker in the wrong state. 
   
   Users will always have to look up the taskID and get the worker state which is on the worker number. 
   As we have async api's it can be very well possible that we end up getting success from a task that according to the controller state is in retrying. ie the worker is in retrying. 
   In such a case we simply have to ignore the state change which becomes hard if we have only the worker id as worker states are on the worker number. 
   
   Another reason which tipped me over towards the taskID approach is that worker-side client interface changes are not required in the worker JVM. 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe merged pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe merged PR #13062:
URL: https://github.com/apache/druid/pull/13062


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r974235779


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -856,7 +875,33 @@ public void onSuccess(final List<Object> workerResultAndOutputChannelsResolved)
             for (OutputChannel channel : outputChannels.getAllChannels()) {
               stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new ConcurrentHashMap<>())
                           .computeIfAbsent(channel.getPartitionNumber(), ignored2 -> channel.getReadableChannel());
+
             }
+
+            if (durableStageStorageEnabled) {
+              // Once the outputs channels have been resolved and are ready for reading, the worker appends the filename
+              // with a special marker flag and adds it to the
+              DurableStorageOutputChannelFactory durableStorageOutputChannelFactory =
+                  DurableStorageOutputChannelFactory.createStandardImplementation(
+                      task.getControllerTaskId(),
+                      task().getWorkerNumber(),
+                      stageDef.getStageNumber(),
+                      task().getId(),
+                      frameContext.memoryParameters().getStandardFrameSize(),
+                      MSQTasks.makeStorageConnector(context.injector())
+                  );
+              try {
+                durableStorageOutputChannelFactory.createSuccessFile(task.getId());

Review Comment:
   Another place where I wanted to check is if a race condition can happen if multiple workers are simultaneously executing this piece of code. Is there any guarantee in the storage connector that ensures that the final file will contain the input from a single worker? I have added a check in the `createSuccessFile` that checks if the file is already created or not for starters. If the write operation for the connector is atomic, I suppose that we should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r970681924


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -856,7 +874,37 @@ public void onSuccess(final List<Object> workerResultAndOutputChannelsResolved)
             for (OutputChannel channel : outputChannels.getAllChannels()) {
               stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new ConcurrentHashMap<>())
                           .computeIfAbsent(channel.getPartitionNumber(), ignored2 -> channel.getReadableChannel());
+
+              if (durableStageStorageEnabled) {
+                // Mark the stuff as __success there

Review Comment:
   Nit: Lets improve this comment. 
   We want to be sure that we only read when the write is fully finished hence add a SUCCESS file 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +119,33 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  @Nullable
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    List<String> fileNames = storageConnector.lsFiles(
+        DurableStorageOutputChannelFactory.getPartitionOutputsFolderName(
+            controllerTaskId,
+            workerNo,
+            stageNumber,
+            partitionNumber
+        )
+    );
+    Optional<String> maybeFileName = fileNames.stream()

Review Comment:
   So we can have issues here. 
   This logic is not deterministic. 
   Lets say we have a case where one zombie task also wrote its output successfully to durable storage. This means, the output of that task will also be considered to be read 
   So now you have 
   `/xxx/workerNumber/task_good_id/part_1_success`
   `/xxx/workerNumber/task_zombie_id/part_1_success`
   
   As rows in both of them may not follow the same order we might be in a soup if worker one reads zombie_task files and worker 2 reads good_id files. 
   
   
   The best fix would be to figure out which path to read the data from at one place IE in the controller. 
   
   If we are not able to do that then we have to make some changes into how the correct taskID files are picked. 
   
   I suggest using time as the tiebreaker. Which ever successful file is written first, we use that. Basically, first write wins. 
   
   
   
   cc @gianm 
   



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java:
##########
@@ -112,31 +132,52 @@ public OutputChannel openNilChannel(int partitionNumber)
       throw new ISE(
           e,
           "Unable to create empty remote output of workerTask[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          workerNumber,
           stageNumber,
           partitionNumber
       );
     }
   }
 
+  public void markAsSuccess(int partitionNumber) throws IOException
+  {
+    String oldPath = getPartitionOutputFileNameForTask(controllerTaskId, workerNumber, stageNumber, partitionNumber, taskId);
+    storageConnector.moveFile(oldPath, StringUtils.format("%s%s", oldPath, SUCCESSFUL_SUFFIX));

Review Comment:
   That way we can also use the timestamp of that file for tiebreaker. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java:
##########
@@ -112,31 +132,52 @@ public OutputChannel openNilChannel(int partitionNumber)
       throw new ISE(
           e,
           "Unable to create empty remote output of workerTask[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          workerNumber,
           stageNumber,
           partitionNumber
       );
     }
   }
 
+  public void markAsSuccess(int partitionNumber) throws IOException
+  {
+    String oldPath = getPartitionOutputFileNameForTask(controllerTaskId, workerNumber, stageNumber, partitionNumber, taskId);
+    storageConnector.moveFile(oldPath, StringUtils.format("%s%s", oldPath, SUCCESSFUL_SUFFIX));

Review Comment:
   This is a very expensive call in s3. IIRC you basically copy from old path to new path. Its not just a metadata update as in the case of HDFS. 
   
   IMHO we might need a better approach here. 
   
   Just create a __SUCCESS file in the dir ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -856,7 +874,37 @@ public void onSuccess(final List<Object> workerResultAndOutputChannelsResolved)
             for (OutputChannel channel : outputChannels.getAllChannels()) {
               stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new ConcurrentHashMap<>())
                           .computeIfAbsent(channel.getPartitionNumber(), ignored2 -> channel.getReadableChannel());
+
+              if (durableStageStorageEnabled) {
+                // Mark the stuff as __success there
+                DurableStorageOutputChannelFactory durableStorageOutputChannelFactory =
+                    DurableStorageOutputChannelFactory.createStandardImplementation(
+                        task.getControllerTaskId(),
+                        task().getWorkerNumber(),
+                        stageDef.getStageNumber(),
+                        task().getId(),
+                        frameContext.memoryParameters().getStandardFrameSize(),
+                        MSQTasks.makeStorageConnector(context.injector())
+                    );
+                try {
+                  durableStorageOutputChannelFactory.markAsSuccess(channel.getPartitionNumber());

Review Comment:
   Looks like you are marking the file successfull twice accidentally. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r974231888


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +118,59 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  /**
+   * Given an input worker number, stage number and the partition number, this method figures out the exact location
+   * where the outputs would be present in the durable storage and returns the complete path or throws an exception
+   * if no such file exists in the durable storage
+   * More information at {@link DurableStorageOutputChannelFactory#createSuccessFile(String)}
+   */
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    String successfulFilePath = DurableStorageUtils.getSuccessFilePath(
+        controllerTaskId,
+        stageNumber,
+        workerNo
+    );
+
+    if (!storageConnector.pathExists(successfulFilePath)) {
+      throw new ISE(
+          "No file present at the location [%s]. Unable to read the inputs for worker: [%d], stage: [%d], partition: [%d]",
+          successfulFilePath,
+          workerNo,
+          stageNumber,
+          partitionNumber
+      );
+    }
+
+    String successfulTaskId;
+
+    try (InputStream is = storageConnector.read(successfulFilePath)) {
+      // TODO: Check if this call can block indefinitely
+      successfulTaskId = IOUtils.toString(is, StandardCharsets.UTF_8);

Review Comment:
   Wanted to call this out, and check if it can potentially block the execution indefinitely and if there is a better option for the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r972641028


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java:
##########
@@ -112,31 +132,52 @@ public OutputChannel openNilChannel(int partitionNumber)
       throw new ISE(
           e,
           "Unable to create empty remote output of workerTask[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          workerNumber,
           stageNumber,
           partitionNumber
       );
     }
   }
 
+  public void markAsSuccess(int partitionNumber) throws IOException
+  {
+    String oldPath = getPartitionOutputFileNameForTask(controllerTaskId, workerNumber, stageNumber, partitionNumber, taskId);
+    storageConnector.moveFile(oldPath, StringUtils.format("%s%s", oldPath, SUCCESSFUL_SUFFIX));

Review Comment:
   Any alternatives for the S3 client? The current implementation has copy and delete in S3. I didn't find a rename operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r995354141


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java:
##########
@@ -72,7 +72,7 @@ ListenableFuture<Void> postResultPartitionBoundaries(
    * kind of unrecoverable exception).
    */
   ListenableFuture<Boolean> fetchChannelData(
-      String workerTaskId,
+      int workerNumber,

Review Comment:
   The worker-client needs to be a simple client. Given a taskId, do the required operation. 
   I think that would be a more manageable contract for the users of the client as it is deterministic. 
   When the msqWokerTaskLauncher changes something in the background, we do not want an operation to succeed on a worker in the wrong state. 
   
   Users will always have to look up the taskID and get the worker state which is on the worker number. 
   As we have async api's it can be very well possible that we end up getting success from a task that according to the controller state is in retrying. ie the worker is in retrying. 
   In such a case we simply have to ignore the state change which becomes hard if we have only the worker id and not the taskId. We won't know if the taskID is to be ignored or not as the worker state in the controller is on worker id. 
   
   Another reason which tipped me over towards the taskID approach is that worker-side client interface changes are not required in the worker JVM. 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on PR #13062:
URL: https://github.com/apache/druid/pull/13062#issuecomment-1301638954

   Thanks for the contribution @LakshSingla 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on PR #13062:
URL: https://github.com/apache/druid/pull/13062#issuecomment-1245434177

   Undrafted the PR, S3 storage connector `ls()` needs to be implemented. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r997284741


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java:
##########
@@ -36,6 +36,8 @@
 {
   private final String taskId;
   @Nullable
+  private final Integer workerNumber;

Review Comment:
   Is this still being used ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java:
##########
@@ -112,31 +133,28 @@ public OutputChannel openNilChannel(int partitionNumber)
       throw new ISE(
           e,
           "Unable to create empty remote output of workerTask[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          workerNumber,
           stageNumber,
           partitionNumber
       );
     }
   }
 
-  public static String getControllerDirectory(final String controllerTaskId)
-  {
-    return StringUtils.format("controller_%s", IdUtils.validateId("controller task ID", controllerTaskId));
-  }
-
-  public static String getPartitionFileName(
-      final String controllerTaskId,
-      final String workerTaskId,
-      final int stageNumber,
-      final int partitionNumber
-  )
+  /**
+   * Creates a file with name __success and adds the worker's id which has successfully written its outputs. While reading
+   * this file can be used to find out the worker which has written its outputs completely.
+   * Rename operation is not very quick in cloud storage like S3 due to which this alternative
+   * route has been taken
+   */
+  public void createSuccessFile(String taskId) throws IOException

Review Comment:
   We should also mention that if the path already exists this operation is a no op. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +118,58 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  /**
+   * Given an input worker number, stage number and the partition number, this method figures out the exact location
+   * where the outputs would be present in the durable storage and returns the complete path or throws an exception
+   * if no such file exists in the durable storage
+   * More information at {@link DurableStorageOutputChannelFactory#createSuccessFile(String)}
+   */
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    String successfulFilePath = DurableStorageUtils.getSuccessFilePath(
+        controllerTaskId,
+        stageNumber,
+        workerNo
+    );
+
+    if (!storageConnector.pathExists(successfulFilePath)) {
+      throw new ISE(
+          "No file present at the location [%s]. Unable to read the inputs for worker: [%d], stage: [%d], partition: [%d]",
+          successfulFilePath,
+          workerNo,
+          stageNumber,
+          partitionNumber
+      );
+    }
+
+    String successfulTaskId;
+
+    try (InputStream is = storageConnector.read(successfulFilePath)) {
+      successfulTaskId = IOUtils.toString(is, StandardCharsets.UTF_8);
+    }
+    if (successfulTaskId == null) {
+      throw new ISE("Unable to read the task id from the file: [%s]", successfulFilePath);
+    }
+

Review Comment:
   Let's debug log the successfull taskId ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r972601363


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -856,7 +874,37 @@ public void onSuccess(final List<Object> workerResultAndOutputChannelsResolved)
             for (OutputChannel channel : outputChannels.getAllChannels()) {
               stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new ConcurrentHashMap<>())
                           .computeIfAbsent(channel.getPartitionNumber(), ignored2 -> channel.getReadableChannel());
+
+              if (durableStageStorageEnabled) {
+                // Mark the stuff as __success there
+                DurableStorageOutputChannelFactory durableStorageOutputChannelFactory =
+                    DurableStorageOutputChannelFactory.createStandardImplementation(
+                        task.getControllerTaskId(),
+                        task().getWorkerNumber(),
+                        stageDef.getStageNumber(),
+                        task().getId(),
+                        frameContext.memoryParameters().getStandardFrameSize(),
+                        MSQTasks.makeStorageConnector(context.injector())
+                    );
+                try {
+                  durableStorageOutputChannelFactory.markAsSuccess(channel.getPartitionNumber());

Review Comment:
   Thanks for pointing it out! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r973866841


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +119,33 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  @Nullable
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    List<String> fileNames = storageConnector.lsFiles(
+        DurableStorageOutputChannelFactory.getPartitionOutputsFolderName(
+            controllerTaskId,
+            workerNo,
+            stageNumber,
+            partitionNumber
+        )
+    );
+    Optional<String> maybeFileName = fileNames.stream()

Review Comment:
   As we are downloading segments in different threads, outputs are not guaranteed to be same. 
   There can be a case where one parent of the dag is read by two child nodes and then we want to pass the same outputs to both the cases especially if we do limit/offset. Also, a deterministic system is always easier to debug.  
   ```
      ()
    /    \
   ()    ()
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r1005246893


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -888,7 +895,33 @@ public void onSuccess(final List<Object> workerResultAndOutputChannelsResolved)
             for (OutputChannel channel : outputChannels.getAllChannels()) {
               stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new ConcurrentHashMap<>())
                           .computeIfAbsent(channel.getPartitionNumber(), ignored2 -> channel.getReadableChannel());
+
+            }
+
+            if (durableStageStorageEnabled) {
+              // Once the outputs channels have been resolved and are ready for reading, the worker appends the filename
+              // with a special marker flag and adds it to the
+              DurableStorageOutputChannelFactory durableStorageOutputChannelFactory =
+                  DurableStorageOutputChannelFactory.createStandardImplementation(
+                      task.getControllerTaskId(),
+                      task().getWorkerNumber(),
+                      stageDef.getStageNumber(),
+                      task().getId(),
+                      frameContext.memoryParameters().getStandardFrameSize(),
+                      MSQTasks.makeStorageConnector(context.injector())
+                  );
+              try {
+                durableStorageOutputChannelFactory.createSuccessFile(task.getId());
+              }
+              catch (IOException e) {
+                throw new ISE(
+                    e,
+                    "Unable to suffix the file with %s",

Review Comment:
   How about changing this to : "Unable to create the success file for task: xx , stage : yy , worker: zz" ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
cryptoe commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r1004188553


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportSimplePublisher.java:
##########
@@ -32,19 +32,22 @@ public class MSQWarningReportSimplePublisher implements MSQWarningReportPublishe
 {
 
   final String workerId;
+  final int workerNumber;

Review Comment:
   do we still need workerID and workerNumber ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +118,64 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  /**
+   * Given an input worker number, stage number and the partition number, this method figures out the exact location
+   * where the outputs would be present in the durable storage and returns the complete path or throws an exception
+   * if no such file exists in the durable storage
+   * More information at {@link DurableStorageOutputChannelFactory#createSuccessFile(String)}
+   */
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    String successfulFilePath = DurableStorageUtils.getSuccessFilePath(
+        controllerTaskId,
+        stageNumber,
+        workerNo
+    );
+
+    if (!storageConnector.pathExists(successfulFilePath)) {
+      throw new ISE(
+          "No file present at the location [%s]. Unable to read the inputs for worker: [%d], stage: [%d], partition: [%d]",

Review Comment:
   ```suggestion
             "No file present at the location [%s]. Unable to read the output of worker: [%d], stage: [%d], partition: [%d]",
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +118,64 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  /**
+   * Given an input worker number, stage number and the partition number, this method figures out the exact location
+   * where the outputs would be present in the durable storage and returns the complete path or throws an exception
+   * if no such file exists in the durable storage
+   * More information at {@link DurableStorageOutputChannelFactory#createSuccessFile(String)}
+   */
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    String successfulFilePath = DurableStorageUtils.getSuccessFilePath(
+        controllerTaskId,
+        stageNumber,
+        workerNo
+    );
+
+    if (!storageConnector.pathExists(successfulFilePath)) {
+      throw new ISE(
+          "No file present at the location [%s]. Unable to read the inputs for worker: [%d], stage: [%d], partition: [%d]",
+          successfulFilePath,
+          workerNo,
+          stageNumber,
+          partitionNumber
+      );
+    }
+
+    String successfulTaskId;
+
+    try (InputStream is = storageConnector.read(successfulFilePath)) {
+      successfulTaskId = IOUtils.toString(is, StandardCharsets.UTF_8);
+    }
+    if (successfulTaskId == null) {
+      throw new ISE("Unable to read the task id from the file: [%s]", successfulFilePath);
+    }
+    LOG.info(
+        "Successful task id for stage [%d] and partition [%d]: [%s]",
+        stageNumber,
+        partitionNumber,
+        successfulTaskId
+    );

Review Comment:
   ```suggestion
       LOG.debug(
           "Reading output of stage [%d] and partition [%d] from task id [%s]", 
           stageNumber,
           partitionNumber,
           successfulTaskId
       );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r990926670


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java:
##########
@@ -72,7 +72,7 @@ ListenableFuture<Void> postResultPartitionBoundaries(
    * kind of unrecoverable exception).
    */
   ListenableFuture<Boolean> fetchChannelData(
-      String workerTaskId,
+      int workerNumber,

Review Comment:
   I think `WorkerClient` should be refactored to include the worker number to save callers the complexity of resolving the ID to number. In the current indexer's implementation, this is an implementation detail that allows the logic to be wrapped up in the implementation class. 
   The behavior based on fault tolerance would change in the controller task only (as the worker tasks won't be communicating with each other in fault-tolerant mode). In the controller task, irrespective of whether the fault tolerance is enabled, this would resolve to whatever is present in the `MSQWorkerTaskLauncher`, which would ultimately be responsible for determining where the call to a specific worker number should resolve.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r972603949


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +119,33 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  @Nullable
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    List<String> fileNames = storageConnector.lsFiles(
+        DurableStorageOutputChannelFactory.getPartitionOutputsFolderName(
+            controllerTaskId,
+            workerNo,
+            stageNumber,
+            partitionNumber
+        )
+    );
+    Optional<String> maybeFileName = fileNames.stream()

Review Comment:
   Thinking about it further, even if the order is different, however, the content is the same, it shouldn't matter since a single worker is assigned to a single partition, right?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13062: Use worker number instead of task id in MSQ for communication to/from workers.

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13062:
URL: https://github.com/apache/druid/pull/13062#discussion_r972600501


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +119,33 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  @Nullable
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    List<String> fileNames = storageConnector.lsFiles(
+        DurableStorageOutputChannelFactory.getPartitionOutputsFolderName(
+            controllerTaskId,
+            workerNo,
+            stageNumber,
+            partitionNumber
+        )
+    );
+    Optional<String> maybeFileName = fileNames.stream()

Review Comment:
   > As rows in both of them may not follow the same order we might be in a soup if worker one reads zombie_task files and worker 2 reads good_id files.
   
   I was under the assumption that the row ordering in both of the successful writes should be the same.
   Considering a non shuffling case, it doesn't change the row order, therefore the output of the successful write should be identical to the input (which if we track back to the original input source, should have a fixed row order.
   In a shuffling case, we might sort the rows; therefore, the output must be similar to the sort done. I think there might be some indeterminism there (which I doubt considering that the FrameChannelMerger for both workers should produce the same output), but when we read the sorted data, we pass it through the `FrameChannelMerger` so as long as the rows are identical (and not the order) in the files we shouldn't have an issue.
   
   This is all considering that the successful files have different outputs, which I don't think should be the case 🤔. WDYT?  



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java:
##########
@@ -112,11 +119,33 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] partition[%d]",
-          workerTaskId,
+          "Could not find remote output of worker task[%d] stage[%d] partition[%d]",
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
     }
   }
+
+  @Nullable
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    List<String> fileNames = storageConnector.lsFiles(
+        DurableStorageOutputChannelFactory.getPartitionOutputsFolderName(
+            controllerTaskId,
+            workerNo,
+            stageNumber,
+            partitionNumber
+        )
+    );
+    Optional<String> maybeFileName = fileNames.stream()

Review Comment:
   > As rows in both of them may not follow the same order we might be in a soup if worker one reads zombie_task files and worker 2 reads good_id files.
   
   I was under the assumption that the row ordering in both of the successful writes should be the same.
   
   Considering a non shuffling case, it doesn't change the row order, therefore the output of the successful write should be identical to the input (which if we track back to the original input source, should have a fixed row order.
   
   In a shuffling case, we might sort the rows; therefore, the output must be similar to the sort done. I think there might be some indeterminism there (which I doubt considering that the FrameChannelMerger for both workers should produce the same output), but when we read the sorted data, we pass it through the `FrameChannelMerger` so as long as the rows are identical (and not the order) in the files we shouldn't have an issue.
   
   This is all considering that the successful files have different outputs, which I don't think should be the case 🤔. WDYT?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org