You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by gi...@apache.org on 2018/07/09 18:23:37 UTC

[incubator-druid] branch 0.12.2 updated: [Backport] Fix missing task type in task payload API (#5941)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new f3aeccf  [Backport] Fix missing task type in task payload API (#5941)
f3aeccf is described below

commit f3aeccf79a7427086e9084292ec9479a8c7fa7a2
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Jul 9 11:23:34 2018 -0700

    [Backport] Fix missing task type in task payload API (#5941)
    
    * Fix missing task type in task payload API. (#5399)
    
    * Fix missing task type in task payload API.
    
    Apparently embedding a polymorphic object inside a Map<String, Object> is
    a bit too much for Jackson to serialize properly. Fix this by using
    wrapper classes.
    
    * Fix OverlordTest casts.
    
    * Remove import.
    
    * Remove unused imports.
    
    * Clarify comments.
    
    * fix test
---
 .../java/io/druid/indexing/common/TaskStatus.java  | 21 ++++++
 .../indexing/overlord/http/OverlordResource.java   | 41 +++++++----
 .../overlord/http/TaskPayloadResponse.java         | 83 +++++++++++++++++++++
 .../indexing/overlord/http/TaskStatusResponse.java | 84 ++++++++++++++++++++++
 .../overlord/http/OverlordResourceTest.java        | 66 ++++++++++++++++-
 .../druid/indexing/overlord/http/OverlordTest.java | 13 ++--
 6 files changed, 284 insertions(+), 24 deletions(-)

diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java
index e353595..10ffacc 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java
@@ -153,4 +153,25 @@ public class TaskStatus
                   .add("duration", duration)
                   .toString();
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TaskStatus that = (TaskStatus) o;
+    return getDuration() == that.getDuration() &&
+           java.util.Objects.equals(getId(), that.getId()) &&
+           status == that.status;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return java.util.Objects.hash(getId(), status, getDuration());
+  }
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 4fa507e..8e4be31 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -169,7 +169,12 @@ public class OverlordResource
             }
             catch (EntryExistsException e) {
               return Response.status(Response.Status.BAD_REQUEST)
-                             .entity(ImmutableMap.of("error", StringUtils.format("Task[%s] already exists!", task.getId())))
+                             .entity(
+                                 ImmutableMap.of(
+                                     "error",
+                                     StringUtils.format("Task[%s] already exists!", task.getId())
+                                 )
+                             )
                              .build();
             }
           }
@@ -209,7 +214,16 @@ public class OverlordResource
   @ResourceFilters(TaskResourceFilter.class)
   public Response getTaskPayload(@PathParam("taskid") String taskid)
   {
-    return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid));
+    final TaskPayloadResponse response = new TaskPayloadResponse(
+        taskid,
+        taskStorageQueryAdapter.getTask(taskid).orNull()
+    );
+
+    final Response.Status status = response.getPayload() == null
+                                   ? Response.Status.NOT_FOUND
+                                   : Response.Status.OK;
+
+    return Response.status(status).entity(response).build();
   }
 
   @GET
@@ -218,7 +232,16 @@ public class OverlordResource
   @ResourceFilters(TaskResourceFilter.class)
   public Response getTaskStatus(@PathParam("taskid") String taskid)
   {
-    return optionalTaskResponse(taskid, "status", taskStorageQueryAdapter.getStatus(taskid));
+    final TaskStatusResponse response = new TaskStatusResponse(
+        taskid,
+        taskStorageQueryAdapter.getStatus(taskid).orNull()
+    );
+
+    final Response.Status status = response.getStatus() == null
+                                   ? Response.Status.NOT_FOUND
+                                   : Response.Status.OK;
+
+    return Response.status(status).entity(response).build();
   }
 
   @GET
@@ -654,18 +677,6 @@ public class OverlordResource
     );
   }
 
-  private <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x)
-  {
-    final Map<String, Object> results = Maps.newHashMap();
-    results.put("task", taskid);
-    if (x.isPresent()) {
-      results.put(objectType, x.get());
-      return Response.status(Response.Status.OK).entity(results).build();
-    } else {
-      return Response.status(Response.Status.NOT_FOUND).entity(results).build();
-    }
-  }
-
   private <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
   {
     if (x.isPresent()) {
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java
new file mode 100644
index 0000000..1e7b4e9
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.overlord.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.indexing.common.task.Task;
+
+import java.util.Objects;
+
+public class TaskPayloadResponse
+{
+  private final String task; // Task ID, named "task" in the JSONification of this class.
+  private final Task payload;
+
+  @JsonCreator
+  public TaskPayloadResponse(
+      @JsonProperty("task") final String task,
+      @JsonProperty("payload") final Task payload
+  )
+  {
+    this.task = task;
+    this.payload = payload;
+  }
+
+  @JsonProperty
+  public String getTask()
+  {
+    return task;
+  }
+
+  @JsonProperty
+  public Task getPayload()
+  {
+    return payload;
+  }
+
+  @Override
+  public boolean equals(final Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final TaskPayloadResponse that = (TaskPayloadResponse) o;
+    return Objects.equals(task, that.task) &&
+           Objects.equals(payload, that.payload);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(task, payload);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TaskPayloadResponse{" +
+           "task='" + task + '\'' +
+           ", payload=" + payload +
+           '}';
+  }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java
new file mode 100644
index 0000000..b15aeb0
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.overlord.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.indexing.common.TaskStatus;
+
+import java.util.Objects;
+
+public class TaskStatusResponse
+{
+  private final String task; // Task ID, named "task" in the JSONification of this class.
+  private final TaskStatus status;
+
+  @JsonCreator
+  public TaskStatusResponse(
+      @JsonProperty("task") final String task,
+      @JsonProperty("status") final TaskStatus status
+  )
+  {
+    this.task = task;
+    this.status = status;
+  }
+
+  @JsonProperty
+  public String getTask()
+  {
+    return task;
+  }
+
+  @JsonProperty
+  public TaskStatus getStatus()
+  {
+    return status;
+  }
+
+  @Override
+  public boolean equals(final Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final TaskStatusResponse that = (TaskStatusResponse) o;
+    return Objects.equals(task, that.task) &&
+           Objects.equals(status, that.status);
+  }
+
+  @Override
+  public int hashCode()
+  {
+
+    return Objects.hash(task, status);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TaskstatusResponse{" +
+           "task='" + task + '\'' +
+           ", status=" + status +
+           '}';
+  }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java
index 22b9e51..4264186 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -39,6 +39,7 @@ import io.druid.indexing.overlord.TaskRunner;
 import io.druid.indexing.overlord.TaskRunnerWorkItem;
 import io.druid.indexing.overlord.TaskStorageQueryAdapter;
 import io.druid.java.util.common.DateTimes;
+import io.druid.segment.TestHelper;
 import io.druid.server.security.Access;
 import io.druid.server.security.Action;
 import io.druid.server.security.AuthConfig;
@@ -87,7 +88,8 @@ public class OverlordResourceTest
         Optional.of(taskRunner)
     ).anyTimes();
 
-    AuthorizerMapper authMapper = new AuthorizerMapper(null) {
+    AuthorizerMapper authMapper = new AuthorizerMapper(null)
+    {
       @Override
       public Authorizer getAuthorizer(String name)
       {
@@ -280,7 +282,12 @@ public class OverlordResourceTest
 
     EasyMock.expect(taskMaster.isLeader()).andReturn(true);
     EasyMock
-        .expect(indexerMetadataStorageAdapter.deletePendingSegments(EasyMock.eq("allow"), EasyMock.anyObject(Interval.class)))
+        .expect(
+            indexerMetadataStorageAdapter.deletePendingSegments(
+                EasyMock.eq("allow"),
+                EasyMock.anyObject(Interval.class)
+            )
+        )
         .andReturn(2);
 
     EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
@@ -291,6 +298,61 @@ public class OverlordResourceTest
     Assert.assertEquals(2, response.get("numDeleted").intValue());
   }
 
+  @Test
+  public void testGetTaskPayload() throws Exception
+  {
+    expectAuthorizationTokenCheck();
+    final NoopTask task = NoopTask.create("mydatasource");
+    EasyMock.expect(taskStorageQueryAdapter.getTask("mytask"))
+            .andReturn(Optional.of(task));
+
+    EasyMock.expect(taskStorageQueryAdapter.getTask("othertask"))
+            .andReturn(Optional.absent());
+
+    EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
+
+    final Response response1 = overlordResource.getTaskPayload("mytask");
+    final TaskPayloadResponse taskPayloadResponse1 = TestHelper.makeJsonMapper().readValue(
+        TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
+        TaskPayloadResponse.class
+    );
+    Assert.assertEquals(new TaskPayloadResponse("mytask", task), taskPayloadResponse1);
+
+    final Response response2 = overlordResource.getTaskPayload("othertask");
+    final TaskPayloadResponse taskPayloadResponse2 = TestHelper.makeJsonMapper().readValue(
+        TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()),
+        TaskPayloadResponse.class
+    );
+    Assert.assertEquals(new TaskPayloadResponse("othertask", null), taskPayloadResponse2);
+  }
+
+  @Test
+  public void testGetTaskStatus() throws Exception
+  {
+    expectAuthorizationTokenCheck();
+    EasyMock.expect(taskStorageQueryAdapter.getStatus("mytask"))
+            .andReturn(Optional.of(TaskStatus.success("mytask")));
+
+    EasyMock.expect(taskStorageQueryAdapter.getStatus("othertask"))
+            .andReturn(Optional.absent());
+
+    EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
+
+    final Response response1 = overlordResource.getTaskStatus("mytask");
+    final TaskStatusResponse taskStatusResponse1 = TestHelper.makeJsonMapper().readValue(
+        TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
+        TaskStatusResponse.class
+    );
+    Assert.assertEquals(new TaskStatusResponse("mytask", TaskStatus.success("mytask")), taskStatusResponse1);
+
+    final Response response2 = overlordResource.getTaskStatus("othertask");
+    final TaskStatusResponse taskStatusResponse2 = TestHelper.makeJsonMapper().readValue(
+        TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()),
+        TaskStatusResponse.class
+    );
+    Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2);
+  }
+
   @After
   public void tearDown()
   {
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
index 9d95c65..8bfdc3c 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
@@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.curator.PotentiallyGzippedCompressionProvider;
 import io.druid.curator.discovery.NoopServiceAnnouncer;
 import io.druid.discovery.DruidLeaderSelector;
@@ -56,6 +54,8 @@ import io.druid.indexing.overlord.supervisor.SupervisorManager;
 import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.guava.CloseQuietly;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.server.DruidNode;
 import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
 import io.druid.server.metrics.NoopServiceEmitter;
@@ -80,7 +80,6 @@ import javax.ws.rs.core.Response;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -237,7 +236,7 @@ public class OverlordTest
 
     // Task payload for task_0 should be present in taskStorage
     response = overlordResource.getTaskPayload(taskId_0);
-    Assert.assertEquals(task_0, ((Map) response.getEntity()).get("payload"));
+    Assert.assertEquals(task_0, ((TaskPayloadResponse) response.getEntity()).getPayload());
 
     // Task not present in taskStorage - should fail
     response = overlordResource.getTaskPayload("whatever");
@@ -245,10 +244,10 @@ public class OverlordTest
 
     // Task status of the submitted task should be running
     response = overlordResource.getTaskStatus(taskId_0);
-    Assert.assertEquals(taskId_0, ((Map) response.getEntity()).get("task"));
+    Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask());
     Assert.assertEquals(
         TaskStatus.running(taskId_0).getStatusCode(),
-        ((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode()
+        ((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode()
     );
 
     // Simulate completion of task_0
@@ -296,7 +295,7 @@ public class OverlordTest
   {
     while (true) {
       Response response = overlordResource.getTaskStatus(taskId);
-      if (status.equals(((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode())) {
+      if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode())) {
         break;
       }
       Thread.sleep(10);


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org