You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/10/01 22:33:26 UTC

[GitHub] gianm closed pull request #6334: make 0.13 tasks API backwards compatible with 0.12 (#6333)

gianm closed pull request #6334: make 0.13 tasks API backwards compatible with 0.12  (#6333)
URL: https://github.com/apache/incubator-druid/pull/6334
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/api/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/api/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java
index 82c76b6867a..e58d83111f1 100644
--- a/api/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java
+++ b/api/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java
@@ -22,6 +22,8 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -29,11 +31,13 @@
 
 public class TaskStatusPlus
 {
+  private static final Logger log = new Logger(TaskStatusPlus.class);
+
   private final String id;
   private final String type;
   private final DateTime createdTime;
   private final DateTime queueInsertionTime;
-  private final TaskState state;
+  private final TaskState statusCode;
   private final RunnerTaskState runnerTaskState;
   private final Long duration;
   private final TaskLocation location;
@@ -42,13 +46,43 @@
   @Nullable
   private final String errorMsg;
 
+  public TaskStatusPlus(
+      String id,
+      String type, // nullable for backward compatibility
+      DateTime createdTime,
+      DateTime queueInsertionTime,
+      @Nullable TaskState statusCode,
+      @Nullable RunnerTaskState runnerStatusCode,
+      @Nullable Long duration,
+      TaskLocation location,
+      @Nullable String dataSource, // nullable for backward compatibility
+      @Nullable String errorMsg
+  )
+  {
+    this(
+        id,
+        type,
+        createdTime,
+        queueInsertionTime,
+        statusCode,
+        statusCode,
+        runnerStatusCode,
+        duration,
+        location,
+        dataSource,
+        errorMsg
+    );
+  }
+
+
   @JsonCreator
   public TaskStatusPlus(
       @JsonProperty("id") String id,
       @JsonProperty("type") @Nullable String type, // nullable for backward compatibility
       @JsonProperty("createdTime") DateTime createdTime,
       @JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
-      @JsonProperty("statusCode") @Nullable TaskState state,
+      @JsonProperty("statusCode") @Nullable TaskState statusCode,
+      @Deprecated @JsonProperty("status") @Nullable TaskState status,  // present for backwards compatibility
       @JsonProperty("runnerStatusCode") @Nullable RunnerTaskState runnerTaskState,
       @JsonProperty("duration") @Nullable Long duration,
       @JsonProperty("location") TaskLocation location,
@@ -56,14 +90,24 @@ public TaskStatusPlus(
       @JsonProperty("errorMsg") @Nullable String errorMsg
   )
   {
-    if (state != null && state.isComplete()) {
+    if (statusCode != null && statusCode.isComplete()) {
       Preconditions.checkNotNull(duration, "duration");
     }
     this.id = Preconditions.checkNotNull(id, "id");
     this.type = type;
     this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime");
     this.queueInsertionTime = Preconditions.checkNotNull(queueInsertionTime, "queueInsertionTime");
-    this.state = state;
+    //checks for deserialization safety
+    if (statusCode != null && status == null) {
+      this.statusCode = statusCode;
+    } else if (statusCode == null && status != null) {
+      this.statusCode = status;
+    } else {
+      if (statusCode != null && status != null && statusCode != status) {
+        throw new RuntimeException(StringUtils.format("statusCode[%s] and status[%s] must match", statusCode, status));
+      }
+      this.statusCode = statusCode;
+    }
     this.runnerTaskState = runnerTaskState;
     this.duration = duration;
     this.location = Preconditions.checkNotNull(location, "location");
@@ -98,14 +142,22 @@ public DateTime getQueueInsertionTime()
 
   @Nullable
   @JsonProperty("statusCode")
-  public TaskState getState()
+  public TaskState getStatusCode()
+  {
+    return statusCode;
+  }
+
+  @Deprecated
+  @Nullable
+  @JsonProperty("status")
+  public TaskState getStatus()
   {
-    return state;
+    return statusCode;
   }
 
   @Nullable
   @JsonProperty("runnerStatusCode")
-  public RunnerTaskState getRunnerTaskState()
+  public RunnerTaskState getRunnerStatusCode()
   {
     return runnerTaskState;
   }
@@ -150,7 +202,7 @@ public boolean equals(Object o)
            Objects.equals(getType(), that.getType()) &&
            Objects.equals(getCreatedTime(), that.getCreatedTime()) &&
            Objects.equals(getQueueInsertionTime(), that.getQueueInsertionTime()) &&
-           getState() == that.getState() &&
+           getStatusCode() == that.getStatusCode() &&
            Objects.equals(getDuration(), that.getDuration()) &&
            Objects.equals(getLocation(), that.getLocation()) &&
            Objects.equals(getDataSource(), that.getDataSource()) &&
@@ -165,7 +217,7 @@ public int hashCode()
         getType(),
         getCreatedTime(),
         getQueueInsertionTime(),
-        getState(),
+        getStatusCode(),
         getDuration(),
         getLocation(),
         getDataSource(),
@@ -181,7 +233,7 @@ public String toString()
            ", type='" + type + '\'' +
            ", createdTime=" + createdTime +
            ", queueInsertionTime=" + queueInsertionTime +
-           ", state=" + state +
+           ", statusCode=" + statusCode +
            ", duration=" + duration +
            ", location=" + location +
            ", dataSource='" + dataSource + '\'' +
diff --git a/api/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java b/api/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java
index 47dce44b91f..f99ea0acbd5 100644
--- a/api/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java
+++ b/api/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java
@@ -61,6 +61,45 @@ public void testSerde() throws IOException
     Assert.assertEquals(status, mapper.readValue(json, TaskStatusPlus.class));
   }
 
+  @Test
+  public void testJsonAttributes() throws IOException
+  {
+    final ObjectMapper mapper = new ObjectMapper();
+    mapper.registerModule(
+        new SimpleModule()
+            .addDeserializer(DateTime.class, new DateTimeDeserializer())
+            .addSerializer(DateTime.class, ToStringSerializer.instance)
+    );
+    final String json = "{\n"
+                        + "\"id\": \"testId\",\n"
+                        + "\"type\": \"testType\",\n"
+                        + "\"createdTime\": \"2018-09-17T06:35:17.392Z\",\n"
+                        + "\"queueInsertionTime\": \"2018-09-17T06:35:17.392Z\",\n"
+                        + "\"statusCode\": \"RUNNING\",\n"
+                        + "\"status\": \"RUNNING\",\n"
+                        + "\"runnerStatusCode\": \"RUNNING\",\n"
+                        + "\"duration\": 1000,\n"
+                        + "\"location\": {\n"
+                        + "\"host\": \"testHost\",\n"
+                        + "\"port\": 1010,\n"
+                        + "\"tlsPort\": -1\n"
+                        + "},\n"
+                        + "\"dataSource\": \"ds_test\",\n"
+                        + "\"errorMsg\": null\n"
+                        + "}";
+    TaskStatusPlus taskStatusPlus = mapper.readValue(json, TaskStatusPlus.class);
+    Assert.assertNotNull(taskStatusPlus);
+    Assert.assertNotNull(taskStatusPlus.getStatusCode());
+    Assert.assertTrue(taskStatusPlus.getStatusCode().isRunnable());
+    Assert.assertNotNull(taskStatusPlus.getRunnerStatusCode());
+
+    String serialized = mapper.writeValueAsString(taskStatusPlus);
+
+    Assert.assertTrue(serialized.contains("\"status\":"));
+    Assert.assertTrue(serialized.contains("\"statusCode\":"));
+    Assert.assertTrue(serialized.contains("\"runnerStatusCode\":"));
+  }
+
   // Copied from org.apache.druid.jackson.JodaStuff
   private static class DateTimeDeserializer extends StdDeserializer<DateTime>
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ClientBasedTaskInfoProvider.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ClientBasedTaskInfoProvider.java
index f80163037fa..cc480bcf9c0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ClientBasedTaskInfoProvider.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ClientBasedTaskInfoProvider.java
@@ -50,6 +50,6 @@ public TaskLocation getTaskLocation(String id)
     final TaskStatusResponse response = client.getTaskStatus(id);
     return response == null ?
            Optional.absent() :
-           Optional.of(TaskStatus.fromCode(id, response.getStatus().getState()));
+           Optional.of(TaskStatus.fromCode(id, response.getStatus().getStatusCode()));
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskHistory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskHistory.java
index 9bce0732759..48161e9cc5c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskHistory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskHistory.java
@@ -39,10 +39,10 @@
   {
     attemptHistory.forEach(status -> {
       Preconditions.checkState(
-          status.getState() == TaskState.SUCCESS || status.getState() == TaskState.FAILED,
+          status.getStatusCode() == TaskState.SUCCESS || status.getStatusCode() == TaskState.FAILED,
           "Complete tasks should be recorded, but the state of task[%s] is [%s]",
           status.getId(),
-          status.getState()
+          status.getStatusCode()
       );
     });
     this.spec = spec;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
index 91c9957787c..87440119ff0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
@@ -115,7 +115,7 @@ public void start(long taskStatusCheckingPeriod)
                 final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
                 final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
                 if (taskStatus != null) {
-                  switch (Preconditions.checkNotNull(taskStatus.getState(), "taskState")) {
+                  switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
                     case SUCCESS:
                       incrementNumSucceededTasks();
 
@@ -152,7 +152,7 @@ public void start(long taskStatusCheckingPeriod)
                       monitorEntry.updateStatus(taskStatus);
                       break;
                     default:
-                      throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getState(), taskId);
+                      throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getStatusCode(), taskId);
                   }
                 }
               }
@@ -459,7 +459,7 @@ private SubTaskCompleteEvent(
 
     TaskState getLastState()
     {
-      return lastStatus == null ? TaskState.FAILED : lastStatus.getState();
+      return lastStatus == null ? TaskState.FAILED : lastStatus.getStatusCode();
     }
 
     @Nullable
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 8a7503b837b..ff3caedfab5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -364,7 +364,7 @@ private void checkState(
         .filter(entry -> {
           final TaskStatusPlus currentStatus = entry.getValue().getCurrentStatus();
           return currentStatus != null &&
-                 (currentStatus.getState() == TaskState.SUCCESS || currentStatus.getState() == TaskState.FAILED);
+                 (currentStatus.getStatusCode() == TaskState.SUCCESS || currentStatus.getStatusCode() == TaskState.FAILED);
         })
         .map(Entry::getKey)
         .findFirst()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
index 80cbe816a63..7e86e4072fc 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
@@ -87,7 +87,7 @@ public void testBasic() throws InterruptedException, ExecutionException, Timeout
       Assert.assertEquals("supervisorId", result.getSpec().getSupervisorTaskId());
       Assert.assertEquals("specId" + i, result.getSpec().getId());
       Assert.assertNotNull(result.getLastStatus());
-      Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getState());
+      Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getStatusCode());
       Assert.assertEquals(TaskState.SUCCESS, result.getLastState());
     }
   }
@@ -113,7 +113,7 @@ public void testRetry() throws InterruptedException, ExecutionException, Timeout
       Assert.assertEquals("specId" + i, result.getSpec().getId());
 
       Assert.assertNotNull(result.getLastStatus());
-      Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getState());
+      Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getStatusCode());
       Assert.assertEquals(TaskState.SUCCESS, result.getLastState());
 
       final TaskHistory<TestTask> taskHistory = monitor.getCompleteSubTaskSpecHistory(specs.get(i).getId());
@@ -122,8 +122,8 @@ public void testRetry() throws InterruptedException, ExecutionException, Timeout
       final List<TaskStatusPlus> attemptHistory = taskHistory.getAttemptHistory();
       Assert.assertNotNull(attemptHistory);
       Assert.assertEquals(3, attemptHistory.size());
-      Assert.assertEquals(TaskState.FAILED, attemptHistory.get(0).getState());
-      Assert.assertEquals(TaskState.FAILED, attemptHistory.get(1).getState());
+      Assert.assertEquals(TaskState.FAILED, attemptHistory.get(0).getStatusCode());
+      Assert.assertEquals(TaskState.FAILED, attemptHistory.get(1).getStatusCode());
     }
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index 85302bda5fe..7964c764069 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -881,6 +881,8 @@ public void testGetTaskStatus() throws Exception
         TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
         TaskStatusResponse.class
     );
+    TaskStatusPlus tsp = taskStatusResponse1.getStatus();
+    Assert.assertEquals(tsp.getStatusCode(), tsp.getStatus());
     Assert.assertEquals(
         new TaskStatusResponse(
             "mytask",
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 1ff113c04bf..8c1f33a1fe7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -248,7 +248,7 @@ public void testOverlordRun() throws Exception
     Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask());
     Assert.assertEquals(
         TaskStatus.running(taskId_0).getStatusCode(),
-        ((TaskStatusResponse) response.getEntity()).getStatus().getState()
+        ((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode()
     );
 
     // Simulate completion of task_0
@@ -296,7 +296,7 @@ private void waitForTaskStatus(String taskId, TaskState status) throws Interrupt
   {
     while (true) {
       Response response = overlordResource.getTaskStatus(taskId);
-      if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getState())) {
+      if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode())) {
         break;
       }
       Thread.sleep(10);
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index eee9eaa75d9..a1d7e51b6f3 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -132,7 +132,7 @@ public TaskState getTaskStatus(String taskID)
           {
           }
       );
-      return taskStatusResponse.getStatus().getState();
+      return taskStatusResponse.getStatus().getStatusCode();
     }
     catch (Exception e) {
       throw Throwables.propagate(e);
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
index 1d275f7feac..501cd5e1398 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
@@ -21,7 +21,7 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskState;
 import org.joda.time.DateTime;
 
 public class TaskResponseObject
@@ -30,14 +30,14 @@
   private final String id;
   private final DateTime createdTime;
   private final DateTime queueInsertionTime;
-  private final TaskStatus status;
+  private final TaskState status;
 
   @JsonCreator
   private TaskResponseObject(
       @JsonProperty("id") String id,
       @JsonProperty("createdTime") DateTime createdTime,
       @JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
-      @JsonProperty("status") TaskStatus status
+      @JsonProperty("status") TaskState status
   )
   {
     this.id = id;
@@ -65,7 +65,7 @@ public DateTime getQueueInsertionTime()
   }
 
   @SuppressWarnings("unused") // Used by Jackson serialization?
-  public TaskStatus getStatus()
+  public TaskState getStatus()
   {
     return status;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org