You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by da...@apache.org on 2019/06/07 23:26:40 UTC
[incubator-druid] branch master updated: Supervisor list api with
states and health (#7839)
This is an automated email from the ASF dual-hosted git repository.
davidlim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3fbb0a5 Supervisor list api with states and health (#7839)
3fbb0a5 is described below
commit 3fbb0a5e00c7c546331cbe56d989a50f36710bf5
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Jun 7 16:26:33 2019 -0700
Supervisor list api with states and health (#7839)
* allow optionally listing all supervisors with their state and health
* docs
* add state to full
* clean
* casing
* format
* spelling
---
docs/content/operations/api-reference.md | 14 ++++
.../MaterializedViewSupervisor.java | 6 ++
.../indexing/kafka/supervisor/KafkaSupervisor.java | 6 --
.../kinesis/supervisor/KinesisSupervisor.java | 6 --
.../overlord/supervisor/SupervisorManager.java | 6 ++
.../overlord/supervisor/SupervisorResource.java | 44 +++++++++----
.../supervisor/SeekableStreamSupervisor.java | 13 ++++
.../supervisor/SupervisorResourceTest.java | 74 +++++++++++++++++++++-
.../overlord/supervisor/NoopSupervisorSpec.java | 6 ++
.../indexing/overlord/supervisor/Supervisor.java | 2 +
10 files changed, 149 insertions(+), 28 deletions(-)
diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md
index 9326f2b..473282f 100644
--- a/docs/content/operations/api-reference.md
+++ b/docs/content/operations/api-reference.md
@@ -510,8 +510,22 @@ Returns a list of objects of the currently active supervisors.
|Field|Type|Description|
|---|---|---|
|`id`|String|supervisor unique identifier|
+|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
+|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
+|`healthy`|Boolean|true or false indicator of overall supervisor health|
|`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)|
+* `/druid/indexer/v1/supervisor?state=true`
+
+Returns a list of objects of the currently active supervisors and their current state.
+
+|Field|Type|Description|
+|---|---|---|
+|`id`|String|supervisor unique identifier|
+|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
+|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
+|`healthy`|Boolean|true or false indicator of overall supervisor health|
+
* `/druid/indexer/v1/supervisor/<supervisorId>`
Returns the current spec for the supervisor with the provided ID.
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index fb65b37..76883ea 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -241,6 +241,12 @@ public class MaterializedViewSupervisor implements Supervisor
}
@Override
+ public SupervisorStateManager.State getState()
+ {
+ return stateManager.getSupervisorState();
+ }
+
+ @Override
public Boolean isHealthy()
{
return stateManager.isHealthy();
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 5d419a4..cdf1336 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -385,10 +385,4 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
{
return spec.getIoConfig();
}
-
- @Override
- public Boolean isHealthy()
- {
- return stateManager.isHealthy();
- }
}
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 39619a2..5a0c861 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -316,10 +316,4 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
{
return true;
}
-
- @Override
- public Boolean isHealthy()
- {
- return stateManager.isHealthy();
- }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 56112d1..5727c4e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -65,6 +65,12 @@ public class SupervisorManager
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.rhs);
}
+ public Optional<SupervisorStateManager.State> getSupervisorState(String id)
+ {
+ Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
+ return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getState());
+ }
+
public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
{
Preconditions.checkState(started, "SupervisorManager not started");
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 9d97a80..e7e9daf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -52,6 +52,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -118,6 +119,7 @@ public class SupervisorResource
@Produces(MediaType.APPLICATION_JSON)
public Response specGetAll(
@QueryParam("full") String full,
+ @QueryParam("state") Boolean state,
@Context final HttpServletRequest req
)
{
@@ -128,20 +130,36 @@ public class SupervisorResource
manager,
manager.getSupervisorIds()
);
-
- if (full == null) {
- return Response.ok(authorizedSupervisorIds).build();
- } else {
- List<Map<String, ?>> all =
- authorizedSupervisorIds.stream()
- .map(x -> ImmutableMap.<String, Object>builder()
- .put("id", x)
- .put("spec", manager.getSupervisorSpec(x).get())
- .build()
- )
- .collect(Collectors.toList());
- return Response.ok(all).build();
+ final boolean includeFull = full != null;
+ final boolean includeState = state != null && state;
+
+ if (includeFull || includeState) {
+ List<Map<String, ?>> allStates = authorizedSupervisorIds
+ .stream()
+ .map(x -> {
+ Optional<SupervisorStateManager.State> theState =
+ manager.getSupervisorState(x);
+ ImmutableMap.Builder<String, Object> theBuilder = ImmutableMap.builder();
+ theBuilder.put("id", x);
+ if (theState.isPresent()) {
+ theBuilder.put("state", theState.get().getBasicState());
+ theBuilder.put("detailedState", theState.get());
+ theBuilder.put("healthy", theState.get().isHealthy());
+ }
+ if (includeFull) {
+ Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
+ if (theSpec.isPresent()) {
+ theBuilder.put("spec", theSpec.get());
+ }
+ }
+ return theBuilder.build();
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ return Response.ok(allStates).build();
}
+
+ return Response.ok(authorizedSupervisorIds).build();
}
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 5e7c693..64112d6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -807,6 +807,19 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return generateReport(true);
}
+
+ @Override
+ public SupervisorStateManager.State getState()
+ {
+ return stateManager.getSupervisorState();
+ }
+
+ @Override
+ public Boolean isHealthy()
+ {
+ return stateManager.isHealthy();
+ }
+
private SupervisorReport<? extends SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType>> generateReport(
boolean includeOffsets
)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 96afde5..f6cb0080 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -166,7 +166,7 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expectLastCall().anyTimes();
replayAll();
- Response response = supervisorResource.specGetAll(null, request);
+ Response response = supervisorResource.specGetAll(null, null, request);
verifyAll();
Assert.assertEquals(200, response.getStatus());
@@ -176,7 +176,7 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
- response = supervisorResource.specGetAll(null, request);
+ response = supervisorResource.specGetAll(null, null, request);
verifyAll();
Assert.assertEquals(503, response.getStatus());
@@ -205,11 +205,15 @@ public class SupervisorResourceTest extends EasyMockSupport
return Collections.singletonList("datasource2");
}
};
+ SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING;
+ SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED;
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(2);
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(2);
+ EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
+ EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
@@ -219,7 +223,7 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expectLastCall().anyTimes();
replayAll();
- Response response = supervisorResource.specGetAll("", request);
+ Response response = supervisorResource.specGetAll("", null, request);
verifyAll();
Assert.assertEquals(200, response.getStatus());
@@ -234,6 +238,70 @@ public class SupervisorResourceTest extends EasyMockSupport
}
@Test
+ public void testSpecGetState()
+ {
+ Set<String> supervisorIds = ImmutableSet.of("id1", "id2");
+ SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null)
+ {
+
+ @Override
+ public List<String> getDataSources()
+ {
+ return Collections.singletonList("datasource1");
+ }
+ };
+ SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null)
+ {
+
+ @Override
+ public List<String> getDataSources()
+ {
+ return Collections.singletonList("datasource2");
+ }
+ };
+
+ SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING;
+ SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED;
+
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
+ EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(1);
+ EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(1);
+ EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
+ EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
+ EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
+ EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
+ EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
+ new AuthenticationResult("druid", "druid", null, null)
+ ).atLeastOnce();
+ request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+ EasyMock.expectLastCall().anyTimes();
+ replayAll();
+
+ Response response = supervisorResource.specGetAll(null, true, request);
+ verifyAll();
+
+ Assert.assertEquals(200, response.getStatus());
+ List<Map<String, Object>> states = (List<Map<String, Object>>) response.getEntity();
+ Assert.assertTrue(
+ states.stream()
+ .allMatch(state -> {
+ final String id = (String) state.get("id");
+ if ("id1".equals(id)) {
+ return state1.equals(state.get("state"))
+ && state1.equals(state.get("detailedState"))
+ && (Boolean) state.get("healthy") == state1.isHealthy();
+ } else if ("id2".equals(id)) {
+ return state2.equals(state.get("state"))
+ && state2.equals(state.get("detailedState"))
+ && (Boolean) state.get("healthy") == state2.isHealthy();
+ }
+ return false;
+ })
+ );
+ }
+
+ @Test
public void testSpecGet()
{
SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null);
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index d8b7266..3a904b9 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -114,6 +114,12 @@ public class NoopSupervisorSpec implements SupervisorSpec
}
@Override
+ public SupervisorStateManager.State getState()
+ {
+ return SupervisorStateManager.BasicState.RUNNING;
+ }
+
+ @Override
public void reset(DataSourceMetadata dataSourceMetadata)
{
}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index cf3f4d5..c0ecf44 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -39,6 +39,8 @@ public interface Supervisor
SupervisorReport getStatus();
+ SupervisorStateManager.State getState();
+
default Map<String, Map<String, Object>> getStats()
{
return ImmutableMap.of();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org