You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by gi...@apache.org on 2018/07/09 18:23:37 UTC
[incubator-druid] branch 0.12.2 updated: [Backport] Fix missing
task type in task payload API (#5941)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new f3aeccf [Backport] Fix missing task type in task payload API (#5941)
f3aeccf is described below
commit f3aeccf79a7427086e9084292ec9479a8c7fa7a2
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Jul 9 11:23:34 2018 -0700
[Backport] Fix missing task type in task payload API (#5941)
* Fix missing task type in task payload API. (#5399)
* Fix missing task type in task payload API.
Apparently embedding a polymorphic object inside a Map<String, Object> is
a bit too much for Jackson to serialize properly. Fix this by using
wrapper classes.
* Fix OverlordTest casts.
* Remove import.
* Remove unused imports.
* Clarify comments.
* fix test
---
.../java/io/druid/indexing/common/TaskStatus.java | 21 ++++++
.../indexing/overlord/http/OverlordResource.java | 41 +++++++----
.../overlord/http/TaskPayloadResponse.java | 83 +++++++++++++++++++++
.../indexing/overlord/http/TaskStatusResponse.java | 84 ++++++++++++++++++++++
.../overlord/http/OverlordResourceTest.java | 66 ++++++++++++++++-
.../druid/indexing/overlord/http/OverlordTest.java | 13 ++--
6 files changed, 284 insertions(+), 24 deletions(-)
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java
index e353595..10ffacc 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java
@@ -153,4 +153,25 @@ public class TaskStatus
.add("duration", duration)
.toString();
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskStatus that = (TaskStatus) o;
+ return getDuration() == that.getDuration() &&
+ java.util.Objects.equals(getId(), that.getId()) &&
+ status == that.status;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return java.util.Objects.hash(getId(), status, getDuration());
+ }
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 4fa507e..8e4be31 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -169,7 +169,12 @@ public class OverlordResource
}
catch (EntryExistsException e) {
return Response.status(Response.Status.BAD_REQUEST)
- .entity(ImmutableMap.of("error", StringUtils.format("Task[%s] already exists!", task.getId())))
+ .entity(
+ ImmutableMap.of(
+ "error",
+ StringUtils.format("Task[%s] already exists!", task.getId())
+ )
+ )
.build();
}
}
@@ -209,7 +214,16 @@ public class OverlordResource
@ResourceFilters(TaskResourceFilter.class)
public Response getTaskPayload(@PathParam("taskid") String taskid)
{
- return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid));
+ final TaskPayloadResponse response = new TaskPayloadResponse(
+ taskid,
+ taskStorageQueryAdapter.getTask(taskid).orNull()
+ );
+
+ final Response.Status status = response.getPayload() == null
+ ? Response.Status.NOT_FOUND
+ : Response.Status.OK;
+
+ return Response.status(status).entity(response).build();
}
@GET
@@ -218,7 +232,16 @@ public class OverlordResource
@ResourceFilters(TaskResourceFilter.class)
public Response getTaskStatus(@PathParam("taskid") String taskid)
{
- return optionalTaskResponse(taskid, "status", taskStorageQueryAdapter.getStatus(taskid));
+ final TaskStatusResponse response = new TaskStatusResponse(
+ taskid,
+ taskStorageQueryAdapter.getStatus(taskid).orNull()
+ );
+
+ final Response.Status status = response.getStatus() == null
+ ? Response.Status.NOT_FOUND
+ : Response.Status.OK;
+
+ return Response.status(status).entity(response).build();
}
@GET
@@ -654,18 +677,6 @@ public class OverlordResource
);
}
- private <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x)
- {
- final Map<String, Object> results = Maps.newHashMap();
- results.put("task", taskid);
- if (x.isPresent()) {
- results.put(objectType, x.get());
- return Response.status(Response.Status.OK).entity(results).build();
- } else {
- return Response.status(Response.Status.NOT_FOUND).entity(results).build();
- }
- }
-
private <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
{
if (x.isPresent()) {
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java
new file mode 100644
index 0000000..1e7b4e9
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.overlord.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.indexing.common.task.Task;
+
+import java.util.Objects;
+
+public class TaskPayloadResponse
+{
+ private final String task; // Task ID, named "task" in the JSONification of this class.
+ private final Task payload;
+
+ @JsonCreator
+ public TaskPayloadResponse(
+ @JsonProperty("task") final String task,
+ @JsonProperty("payload") final Task payload
+ )
+ {
+ this.task = task;
+ this.payload = payload;
+ }
+
+ @JsonProperty
+ public String getTask()
+ {
+ return task;
+ }
+
+ @JsonProperty
+ public Task getPayload()
+ {
+ return payload;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final TaskPayloadResponse that = (TaskPayloadResponse) o;
+ return Objects.equals(task, that.task) &&
+ Objects.equals(payload, that.payload);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(task, payload);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TaskPayloadResponse{" +
+ "task='" + task + '\'' +
+ ", payload=" + payload +
+ '}';
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java
new file mode 100644
index 0000000..b15aeb0
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.overlord.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.indexing.common.TaskStatus;
+
+import java.util.Objects;
+
+public class TaskStatusResponse
+{
+ private final String task; // Task ID, named "task" in the JSONification of this class.
+ private final TaskStatus status;
+
+ @JsonCreator
+ public TaskStatusResponse(
+ @JsonProperty("task") final String task,
+ @JsonProperty("status") final TaskStatus status
+ )
+ {
+ this.task = task;
+ this.status = status;
+ }
+
+ @JsonProperty
+ public String getTask()
+ {
+ return task;
+ }
+
+ @JsonProperty
+ public TaskStatus getStatus()
+ {
+ return status;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final TaskStatusResponse that = (TaskStatusResponse) o;
+ return Objects.equals(task, that.task) &&
+ Objects.equals(status, that.status);
+ }
+
+ @Override
+ public int hashCode()
+ {
+
+ return Objects.hash(task, status);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TaskstatusResponse{" +
+ "task='" + task + '\'' +
+ ", status=" + status +
+ '}';
+ }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java
index 22b9e51..4264186 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -39,6 +39,7 @@ import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.java.util.common.DateTimes;
+import io.druid.segment.TestHelper;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
import io.druid.server.security.AuthConfig;
@@ -87,7 +88,8 @@ public class OverlordResourceTest
Optional.of(taskRunner)
).anyTimes();
- AuthorizerMapper authMapper = new AuthorizerMapper(null) {
+ AuthorizerMapper authMapper = new AuthorizerMapper(null)
+ {
@Override
public Authorizer getAuthorizer(String name)
{
@@ -280,7 +282,12 @@ public class OverlordResourceTest
EasyMock.expect(taskMaster.isLeader()).andReturn(true);
EasyMock
- .expect(indexerMetadataStorageAdapter.deletePendingSegments(EasyMock.eq("allow"), EasyMock.anyObject(Interval.class)))
+ .expect(
+ indexerMetadataStorageAdapter.deletePendingSegments(
+ EasyMock.eq("allow"),
+ EasyMock.anyObject(Interval.class)
+ )
+ )
.andReturn(2);
EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
@@ -291,6 +298,61 @@ public class OverlordResourceTest
Assert.assertEquals(2, response.get("numDeleted").intValue());
}
+ @Test
+ public void testGetTaskPayload() throws Exception
+ {
+ expectAuthorizationTokenCheck();
+ final NoopTask task = NoopTask.create("mydatasource");
+ EasyMock.expect(taskStorageQueryAdapter.getTask("mytask"))
+ .andReturn(Optional.of(task));
+
+ EasyMock.expect(taskStorageQueryAdapter.getTask("othertask"))
+ .andReturn(Optional.absent());
+
+ EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
+
+ final Response response1 = overlordResource.getTaskPayload("mytask");
+ final TaskPayloadResponse taskPayloadResponse1 = TestHelper.makeJsonMapper().readValue(
+ TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
+ TaskPayloadResponse.class
+ );
+ Assert.assertEquals(new TaskPayloadResponse("mytask", task), taskPayloadResponse1);
+
+ final Response response2 = overlordResource.getTaskPayload("othertask");
+ final TaskPayloadResponse taskPayloadResponse2 = TestHelper.makeJsonMapper().readValue(
+ TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()),
+ TaskPayloadResponse.class
+ );
+ Assert.assertEquals(new TaskPayloadResponse("othertask", null), taskPayloadResponse2);
+ }
+
+ @Test
+ public void testGetTaskStatus() throws Exception
+ {
+ expectAuthorizationTokenCheck();
+ EasyMock.expect(taskStorageQueryAdapter.getStatus("mytask"))
+ .andReturn(Optional.of(TaskStatus.success("mytask")));
+
+ EasyMock.expect(taskStorageQueryAdapter.getStatus("othertask"))
+ .andReturn(Optional.absent());
+
+ EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
+
+ final Response response1 = overlordResource.getTaskStatus("mytask");
+ final TaskStatusResponse taskStatusResponse1 = TestHelper.makeJsonMapper().readValue(
+ TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
+ TaskStatusResponse.class
+ );
+ Assert.assertEquals(new TaskStatusResponse("mytask", TaskStatus.success("mytask")), taskStatusResponse1);
+
+ final Response response2 = overlordResource.getTaskStatus("othertask");
+ final TaskStatusResponse taskStatusResponse2 = TestHelper.makeJsonMapper().readValue(
+ TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()),
+ TaskStatusResponse.class
+ );
+ Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2);
+ }
+
@After
public void tearDown()
{
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
index 9d95c65..8bfdc3c 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
@@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.discovery.DruidLeaderSelector;
@@ -56,6 +54,8 @@ import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.CloseQuietly;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.metrics.NoopServiceEmitter;
@@ -80,7 +80,6 @@ import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -237,7 +236,7 @@ public class OverlordTest
// Task payload for task_0 should be present in taskStorage
response = overlordResource.getTaskPayload(taskId_0);
- Assert.assertEquals(task_0, ((Map) response.getEntity()).get("payload"));
+ Assert.assertEquals(task_0, ((TaskPayloadResponse) response.getEntity()).getPayload());
// Task not present in taskStorage - should fail
response = overlordResource.getTaskPayload("whatever");
@@ -245,10 +244,10 @@ public class OverlordTest
// Task status of the submitted task should be running
response = overlordResource.getTaskStatus(taskId_0);
- Assert.assertEquals(taskId_0, ((Map) response.getEntity()).get("task"));
+ Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask());
Assert.assertEquals(
TaskStatus.running(taskId_0).getStatusCode(),
- ((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode()
+ ((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode()
);
// Simulate completion of task_0
@@ -296,7 +295,7 @@ public class OverlordTest
{
while (true) {
Response response = overlordResource.getTaskStatus(taskId);
- if (status.equals(((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode())) {
+ if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode())) {
break;
}
Thread.sleep(10);
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org