You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by da...@apache.org on 2019/06/07 23:26:40 UTC

[incubator-druid] branch master updated: Supervisor list api with states and health (#7839)

This is an automated email from the ASF dual-hosted git repository.

davidlim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fbb0a5  Supervisor list api with states and health (#7839)
3fbb0a5 is described below

commit 3fbb0a5e00c7c546331cbe56d989a50f36710bf5
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Jun 7 16:26:33 2019 -0700

    Supervisor list api with states and health (#7839)
    
    * allow optionally listing all supervisors with their state and health
    
    * docs
    
    * add state to full
    
    * clean
    
    * casing
    
    * format
    
    * spelling
---
 docs/content/operations/api-reference.md           | 14 ++++
 .../MaterializedViewSupervisor.java                |  6 ++
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  6 --
 .../kinesis/supervisor/KinesisSupervisor.java      |  6 --
 .../overlord/supervisor/SupervisorManager.java     |  6 ++
 .../overlord/supervisor/SupervisorResource.java    | 44 +++++++++----
 .../supervisor/SeekableStreamSupervisor.java       | 13 ++++
 .../supervisor/SupervisorResourceTest.java         | 74 +++++++++++++++++++++-
 .../overlord/supervisor/NoopSupervisorSpec.java    |  6 ++
 .../indexing/overlord/supervisor/Supervisor.java   |  2 +
 10 files changed, 149 insertions(+), 28 deletions(-)

diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md
index 9326f2b..473282f 100644
--- a/docs/content/operations/api-reference.md
+++ b/docs/content/operations/api-reference.md
@@ -510,8 +510,22 @@ Returns a list of objects of the currently active supervisors.
 |Field|Type|Description|
 |---|---|---|
 |`id`|String|supervisor unique identifier|
+|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
+|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
+|`healthy`|Boolean|true or false indicator of overall supervisor health|
 |`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)|
 
+* `/druid/indexer/v1/supervisor?state=true`
+
+Returns a list of objects of the currently active supervisors and their current state.
+
+|Field|Type|Description|
+|---|---|---|
+|`id`|String|supervisor unique identifier|
+|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
+|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
+|`healthy`|Boolean|true or false indicator of overall supervisor health|
+
 * `/druid/indexer/v1/supervisor/<supervisorId>`
 
 Returns the current spec for the supervisor with the provided ID.
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index fb65b37..76883ea 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -241,6 +241,12 @@ public class MaterializedViewSupervisor implements Supervisor
   }
 
   @Override
+  public SupervisorStateManager.State getState()
+  {
+    return stateManager.getSupervisorState();
+  }
+
+  @Override
   public Boolean isHealthy()
   {
     return stateManager.isHealthy();
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 5d419a4..cdf1336 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -385,10 +385,4 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
   {
     return spec.getIoConfig();
   }
-
-  @Override
-  public Boolean isHealthy()
-  {
-    return stateManager.isHealthy();
-  }
 }
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 39619a2..5a0c861 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -316,10 +316,4 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
   {
     return true;
   }
-
-  @Override
-  public Boolean isHealthy()
-  {
-    return stateManager.isHealthy();
-  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 56112d1..5727c4e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -65,6 +65,12 @@ public class SupervisorManager
     return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.rhs);
   }
 
+  public Optional<SupervisorStateManager.State> getSupervisorState(String id)
+  {
+    Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
+    return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getState());
+  }
+
   public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
   {
     Preconditions.checkState(started, "SupervisorManager not started");
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 9d97a80..e7e9daf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -52,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -118,6 +119,7 @@ public class SupervisorResource
   @Produces(MediaType.APPLICATION_JSON)
   public Response specGetAll(
       @QueryParam("full") String full,
+      @QueryParam("state") Boolean state,
       @Context final HttpServletRequest req
   )
   {
@@ -128,20 +130,36 @@ public class SupervisorResource
               manager,
               manager.getSupervisorIds()
           );
-
-          if (full == null) {
-            return Response.ok(authorizedSupervisorIds).build();
-          } else {
-            List<Map<String, ?>> all =
-                authorizedSupervisorIds.stream()
-                                       .map(x -> ImmutableMap.<String, Object>builder()
-                                           .put("id", x)
-                                           .put("spec", manager.getSupervisorSpec(x).get())
-                                           .build()
-                                       )
-                                       .collect(Collectors.toList());
-            return Response.ok(all).build();
+          final boolean includeFull = full != null;
+          final boolean includeState = state != null && state;
+
+          if (includeFull || includeState) {
+            List<Map<String, ?>> allStates = authorizedSupervisorIds
+                .stream()
+                .map(x -> {
+                  Optional<SupervisorStateManager.State> theState =
+                      manager.getSupervisorState(x);
+                  ImmutableMap.Builder<String, Object> theBuilder = ImmutableMap.builder();
+                  theBuilder.put("id", x);
+                  if (theState.isPresent()) {
+                    theBuilder.put("state", theState.get().getBasicState());
+                    theBuilder.put("detailedState", theState.get());
+                    theBuilder.put("healthy", theState.get().isHealthy());
+                  }
+                  if (includeFull) {
+                    Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
+                    if (theSpec.isPresent()) {
+                      theBuilder.put("spec", theSpec.get());
+                    }
+                  }
+                  return theBuilder.build();
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+            return Response.ok(allStates).build();
           }
+
+          return Response.ok(authorizedSupervisorIds).build();
         }
     );
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 5e7c693..64112d6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -807,6 +807,19 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return generateReport(true);
   }
 
+
+  @Override
+  public SupervisorStateManager.State getState()
+  {
+    return stateManager.getSupervisorState();
+  }
+
+  @Override
+  public Boolean isHealthy()
+  {
+    return stateManager.isHealthy();
+  }
+
   private SupervisorReport<? extends SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType>> generateReport(
       boolean includeOffsets
   )
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 96afde5..f6cb0080 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -166,7 +166,7 @@ public class SupervisorResourceTest extends EasyMockSupport
     EasyMock.expectLastCall().anyTimes();
     replayAll();
 
-    Response response = supervisorResource.specGetAll(null, request);
+    Response response = supervisorResource.specGetAll(null, null, request);
     verifyAll();
 
     Assert.assertEquals(200, response.getStatus());
@@ -176,7 +176,7 @@ public class SupervisorResourceTest extends EasyMockSupport
     EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
     replayAll();
 
-    response = supervisorResource.specGetAll(null, request);
+    response = supervisorResource.specGetAll(null, null, request);
     verifyAll();
 
     Assert.assertEquals(503, response.getStatus());
@@ -205,11 +205,15 @@ public class SupervisorResourceTest extends EasyMockSupport
         return Collections.singletonList("datasource2");
       }
     };
+    SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING;
+    SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED;
 
     EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
     EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
     EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(2);
     EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(2);
+    EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
+    EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
     EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
     EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
     EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
@@ -219,7 +223,7 @@ public class SupervisorResourceTest extends EasyMockSupport
     EasyMock.expectLastCall().anyTimes();
     replayAll();
 
-    Response response = supervisorResource.specGetAll("", request);
+    Response response = supervisorResource.specGetAll("", null, request);
     verifyAll();
 
     Assert.assertEquals(200, response.getStatus());
@@ -234,6 +238,70 @@ public class SupervisorResourceTest extends EasyMockSupport
   }
 
   @Test
+  public void testSpecGetState()
+  {
+    Set<String> supervisorIds = ImmutableSet.of("id1", "id2");
+    SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null)
+    {
+
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+    SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null)
+    {
+
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource2");
+      }
+    };
+
+    SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING;
+    SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED;
+
+    EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
+    EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(1);
+    EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(1);
+    EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
+    EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
+    EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
+    EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
+    EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
+        new AuthenticationResult("druid", "druid", null, null)
+    ).atLeastOnce();
+    request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+    EasyMock.expectLastCall().anyTimes();
+    replayAll();
+
+    Response response = supervisorResource.specGetAll(null, true, request);
+    verifyAll();
+
+    Assert.assertEquals(200, response.getStatus());
+    List<Map<String, Object>> states = (List<Map<String, Object>>) response.getEntity();
+    Assert.assertTrue(
+        states.stream()
+             .allMatch(state -> {
+               final String id = (String) state.get("id");
+               if ("id1".equals(id)) {
+                 return state1.equals(state.get("state"))
+                        && state1.equals(state.get("detailedState"))
+                        && (Boolean) state.get("healthy") == state1.isHealthy();
+               } else if ("id2".equals(id)) {
+                 return state2.equals(state.get("state"))
+                        && state2.equals(state.get("detailedState"))
+                        && (Boolean) state.get("healthy") == state2.isHealthy();
+               }
+               return false;
+             })
+    );
+  }
+
+  @Test
   public void testSpecGet()
   {
     SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null);
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index d8b7266..3a904b9 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -114,6 +114,12 @@ public class NoopSupervisorSpec implements SupervisorSpec
       }
 
       @Override
+      public SupervisorStateManager.State getState()
+      {
+        return SupervisorStateManager.BasicState.RUNNING;
+      }
+
+      @Override
       public void reset(DataSourceMetadata dataSourceMetadata)
       {
       }
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index cf3f4d5..c0ecf44 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -39,6 +39,8 @@ public interface Supervisor
 
   SupervisorReport getStatus();
 
+  SupervisorStateManager.State getState();
+
   default Map<String, Map<String, Object>> getStats()
   {
     return ImmutableMap.of();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org