You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/06/01 00:16:11 UTC

[incubator-druid] branch master updated: Add errors and state to stream supervisor status API endpoint (#7428)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8032c4a  Add errors and state to stream supervisor status API endpoint (#7428)
8032c4a is described below

commit 8032c4add8f78d0c15044d0847201c618e27dc25
Author: Justin Borromeo <jb...@edu.uwaterloo.ca>
AuthorDate: Fri May 31 20:16:01 2019 -0400

    Add errors and state to stream supervisor status API endpoint (#7428)
    
    * Add state and error tracking for seekable stream supervisors
    
    * Fixed nits in docs
    
    * Made inner class static and updated spec test with jackson inject
    
    * Review changes
    
    * Remove redundant config param in supervisor
    
    * Style
    
    * Applied some of Jon's recommendations
    
    * Add transience field
    
    * write test
    
    * implement code review changes except for reconsidering logic of markRunFinishedAndEvaluateHealth()
    
    * remove transience reporting and fix SeekableStreamSupervisorStateManager impl
    
    * move call to stateManager.markRunFinished() from RunNotice to runInternal() for tests
    
    * remove stateHistory because it wasn't adding much value, some fixes, and add more tests
    
    * fix tests
    
    * code review changes and add HTTP health check status
    
    * fix test failure
    
    * refactor to split into a generic SupervisorStateManager and a specific SeekableStreamSupervisorStateManager
    
    * fixup after merge
    
    * code review changes - add additional docs
    
    * cleanup KafkaIndexTaskTest
    
    * add additional documentation for Kinesis indexing
    
    * remove unused throws class
---
 .../apache/druid/guice/DruidSecondaryModule.java   |   2 -
 .../org/apache/druid/utils/CircularBufferTest.java |  93 +++
 docs/content/configuration/index.md                |  11 +
 .../development/extensions-core/kafka-ingestion.md |  49 ++
 .../extensions-core/kinesis-ingestion.md           |  56 +-
 .../MaterializedViewSupervisor.java                |  63 +-
 .../MaterializedViewSupervisorReport.java          |  33 +-
 .../MaterializedViewSupervisorSpec.java            | 102 ++-
 .../MaterializedViewSupervisorSpecTest.java        |  11 +-
 .../MaterializedViewSupervisorTest.java            |   7 +-
 .../druid/indexing/kafka/KafkaRecordSupplier.java  |  82 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  12 +-
 .../supervisor/KafkaSupervisorReportPayload.java   |  23 +-
 .../kafka/supervisor/KafkaSupervisorSpec.java      |  11 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  21 +-
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java |   1 +
 .../kafka/supervisor/KafkaSupervisorSpecTest.java  |   2 +
 .../kafka/supervisor/KafkaSupervisorTest.java      |  64 +-
 .../indexing/kinesis/KinesisRecordSupplier.java    |  54 +-
 .../kinesis/supervisor/KinesisSupervisor.java      |  12 +-
 .../supervisor/KinesisSupervisorReportPayload.java |  20 +-
 .../kinesis/supervisor/KinesisSupervisorSpec.java  |  10 +-
 .../indexing/kinesis/KinesisSamplerSpecTest.java   |   1 +
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  94 ++-
 .../druid/indexing/common/task/IndexTask.java      |   4 +-
 .../apache/druid/indexing/overlord/TaskQueue.java  |   2 +-
 .../overlord/supervisor/SupervisorManager.java     |   6 +
 .../overlord/supervisor/SupervisorResource.java    |  28 +-
 .../seekablestream/common/StreamException.java     |  28 +
 .../supervisor/SeekableStreamSupervisor.java       | 122 ++-
 .../SeekableStreamSupervisorReportPayload.java     |  41 +-
 .../supervisor/SeekableStreamSupervisorSpec.java   |  11 +-
 .../SeekableStreamSupervisorStateManager.java      | 114 +++
 .../supervisor/SupervisorResourceTest.java         |  64 +-
 .../SeekableStreamSupervisorStateManagerTest.java  | 320 ++++++++
 .../SeekableStreamSupervisorStateTest.java         | 914 +++++++++++++++++++++
 .../indexing/overlord/supervisor/Supervisor.java   |   6 +
 .../overlord/supervisor/SupervisorModule.java      |  34 +
 .../supervisor/SupervisorStateManager.java         | 288 +++++++
 .../supervisor/SupervisorStateManagerConfig.java   |  88 ++
 .../appenderator/BaseAppenderatorDriver.java       |   2 +-
 .../coordination/ChangeRequestHistoryTest.java     |  58 +-
 .../java/org/apache/druid/cli/CliOverlord.java     |   6 +-
 43 files changed, 2621 insertions(+), 349 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java b/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java
index cded7dd..319c0d1 100644
--- a/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java
+++ b/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java
@@ -34,8 +34,6 @@ import org.skife.config.ConfigurationObjectFactory;
 import javax.validation.Validator;
 import java.util.Properties;
 
-/**
- */
 public class DruidSecondaryModule implements Module
 {
   private final Properties properties;
diff --git a/core/src/test/java/org/apache/druid/utils/CircularBufferTest.java b/core/src/test/java/org/apache/druid/utils/CircularBufferTest.java
new file mode 100644
index 0000000..9a2fbed
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/utils/CircularBufferTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class CircularBufferTest
+{
+  @Test
+  public void testCircularBufferGetLatest()
+  {
+    CircularBuffer<Integer> buff = new CircularBuffer(4);
+
+    for (int i = 1; i <= 9; i++) {
+      buff.add(i); // buffer should contain [9, 6, 7, 8]
+    }
+    for (int i = 0; i < 4; i++) {
+      Assert.assertEquals((Integer) (9 - i), buff.getLatest(i));
+    }
+  }
+
+  @Test
+  public void testCircularBufferGet()
+  {
+    CircularBuffer<Integer> circularBuffer = new CircularBuffer<>(
+        3);
+
+    circularBuffer.add(1);
+    Assert.assertEquals(1, circularBuffer.size());
+    Assert.assertEquals(1, (int) circularBuffer.get(0));
+
+    circularBuffer.add(2);
+    Assert.assertEquals(2, circularBuffer.size());
+    for (int i = 0; i < circularBuffer.size(); i++) {
+      Assert.assertEquals(i + 1, (int) circularBuffer.get(i));
+    }
+
+    circularBuffer.add(3);
+    Assert.assertEquals(3, circularBuffer.size());
+    for (int i = 0; i < circularBuffer.size(); i++) {
+      Assert.assertEquals(i + 1, (int) circularBuffer.get(i));
+    }
+
+    circularBuffer.add(4);
+    Assert.assertEquals(3, circularBuffer.size());
+    for (int i = 0; i < circularBuffer.size(); i++) {
+      Assert.assertEquals(i + 2, (int) circularBuffer.get(i));
+    }
+
+    circularBuffer.add(5);
+    Assert.assertEquals(3, circularBuffer.size());
+    for (int i = 0; i < circularBuffer.size(); i++) {
+      Assert.assertEquals(i + 3, (int) circularBuffer.get(i));
+    }
+
+    circularBuffer.add(6);
+    Assert.assertEquals(3, circularBuffer.size());
+    for (int i = 0; i < circularBuffer.size(); i++) {
+      Assert.assertEquals(i + 4, (int) circularBuffer.get(i));
+    }
+
+    circularBuffer.add(7);
+    Assert.assertEquals(3, circularBuffer.size());
+    for (int i = 0; i < circularBuffer.size(); i++) {
+      Assert.assertEquals(i + 5, (int) circularBuffer.get(i));
+    }
+
+    circularBuffer.add(8);
+    Assert.assertEquals(3, circularBuffer.size());
+    for (int i = 0; i < circularBuffer.size(); i++) {
+      Assert.assertEquals(i + 6, (int) circularBuffer.get(i));
+    }
+  }
+}
diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index 25e0ed9..41f2dd5 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -943,6 +943,17 @@ There are additional configs for autoscaling (if it is enabled):
 |`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null|
 |`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080|
 
+##### Supervisors
+
+|Property|Description|Default|
+|--------|-----------|-------|
+|`druid.supervisor.healthinessThreshold`|The number of successful runs before an unhealthy supervisor is again considered healthy.|3|
+|`druid.supervisor.unhealthinessThreshold`|The number of failed runs before the supervisor is considered unhealthy.|3|
+|`druid.supervisor.taskHealthinessThreshold`|The number of consecutive task successes before an unhealthy supervisor is again considered healthy.|3|
+|`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
+|`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
+|`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+
 #### Overlord Dynamic Configuration
 
 The Overlord can dynamically change worker behavior.
diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md
index b415c27..c070e46 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -214,12 +214,61 @@ offsets as reported by Kafka, the consumer lag per partition, as well as the agg
 consumer lag per partition may be reported as negative values if the supervisor has not received a recent latest offset
 response from Kafka. The aggregate lag value will always be >= 0.
 
+The status report also contains the supervisor's state and a list of recently thrown exceptions (reported as
+`recentErrors`, whose max size can be controlled using the `druid.supervisor.maxStoredExceptionEvents` configuration).
+There are two fields related to the supervisor's state - `state` and `detailedState`. The `state` field will always be
+one of a small number of generic states that are applicable to any type of supervisor, while the `detailedState` field
+will contain a more descriptive, implementation-specific state that may provide more insight into the supervisor's
+activities than the generic `state` field.
+
+The list of possible `state` values are: [`PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`, `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`]
+
+The list of `detailedState` values and their corresponding `state` mapping is as follows:
+
+|Detailed State|Corresponding State|Description|
+|--------------|-------------------|-----------|
+|UNHEALTHY_SUPERVISOR|UNHEALTHY_SUPERVISOR|The supervisor has encountered errors on the past `druid.supervisor.unhealthinessThreshold` iterations|
+|UNHEALTHY_TASKS|UNHEALTHY_TASKS|The last `druid.supervisor.taskUnhealthinessThreshold` tasks have all failed|
+|UNABLE_TO_CONNECT_TO_STREAM|UNHEALTHY_SUPERVISOR|The supervisor is encountering connectivity issues with Kafka and has not successfully connected in the past|
+|LOST_CONTACT_WITH_STREAM|UNHEALTHY_SUPERVISOR|The supervisor is encountering connectivity issues with Kafka but has successfully connected in the past|
+|PENDING (first iteration only)|PENDING|The supervisor has been initialized and hasn't started connecting to the stream|
+|CONNECTING_TO_STREAM (first iteration only)|RUNNING|The supervisor is trying to connect to the stream and update partition data|
+|DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is discovering already-running tasks|
+|CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating tasks and discovering state|
+|RUNNING|RUNNING|The supervisor has started tasks and is waiting for taskDuration to elapse|
+|SUSPENDED|SUSPENDED|The supervisor has been suspended|
+|STOPPING|STOPPING|The supervisor is stopping|
+
+On each iteration of the supervisor's run loop, the supervisor completes the following tasks in sequence:
+  1) Fetch the list of partitions from Kafka and determine the starting offset for each partition (either based on the
+  last processed offset if continuing, or starting from the beginning or ending of the stream if this is a new topic).
+  2) Discover any running indexing tasks that are writing to the supervisor's datasource and adopt them if they match
+  the supervisor's configuration, else signal them to stop.
+  3) Send a status request to each supervised task to update our view of the state of the tasks under our supervision.
+  4) Handle tasks that have exceeded `taskDuration` and should transition from the reading to publishing state.
+  5) Handle tasks that have finished publishing and signal redundant replica tasks to stop.
+  6) Handle tasks that have failed and clean up the supervisor's internal state.
+  7) Compare the list of healthy tasks to the requested `taskCount` and `replicas` configurations and create additional tasks if required.
+
+The `detailedState` field will show additional values (those marked with "first iteration only") the first time the
+supervisor executes this run loop after startup or after resuming from a suspension. This is intended to surface
+initialization-type issues, where the supervisor is unable to reach a stable state (perhaps because it can't connect to
+Kafka, it can't read from the Kafka topic, or it can't communicate with existing tasks). Once the supervisor is stable -
+that is, once it has completed a full execution without encountering any issues - `detailedState` will show a `RUNNING`
+state until it is stopped, suspended, or hits a failure threshold and transitions to an unhealthy state.
+
 ### Getting Supervisor Ingestion Stats Report
 
 `GET /druid/indexer/v1/supervisor/<supervisorId>/stats` returns a snapshot of the current ingestion row counters for each task being managed by the supervisor, along with moving averages for the row counters.
 
 See [Task Reports: Row Stats](../../ingestion/reports.html#row-stats) for more information.
 
+### Supervisor Health Check
+
+`GET /druid/indexer/v1/supervisor/<supervisorId>/health` returns `200 OK` if the supervisor is healthy and
+`503 Service Unavailable` if it is unhealthy. Healthiness is determined by the supervisor's `state` (as returned by the
+`/status` endpoint) and the `druid.supervisor.*` Overlord configuration thresholds.
+
 ### Updating Existing Supervisors
 
 `POST /druid/indexer/v1/supervisor` can be used to update existing supervisor spec.
diff --git a/docs/content/development/extensions-core/kinesis-ingestion.md b/docs/content/development/extensions-core/kinesis-ingestion.md
index 3d406ed..0578dd2 100644
--- a/docs/content/development/extensions-core/kinesis-ingestion.md
+++ b/docs/content/development/extensions-core/kinesis-ingestion.md
@@ -113,7 +113,7 @@ A sample supervisor spec is shown below:
 }
 ```
 
-## Supervisor Configuration
+## Supervisor Spec
 
 |Field|Description|Required|
 |--------|-----------|---------|
@@ -218,12 +218,58 @@ To authenticate with AWS, you must provide your AWS access key and AWS secret ke
 ```
 -Ddruid.kinesis.accessKey=123 -Ddruid.kinesis.secretKey=456
 ```
-The AWS access key ID and secret access key are used for Kinesis API requests. If this is not provided, the service will look for credentials set in environment variables, in the default profile configuration file, and from the EC2 instance profile provider (in this order).
+The AWS access key ID and secret access key are used for Kinesis API requests. If this is not provided, the service will
+look for credentials set in environment variables, in the default profile configuration file, and from the EC2 instance
+profile provider (in this order).
 
 ### Getting Supervisor Status Report
 
-`GET /druid/indexer/v1/supervisor/<supervisorId>/status` returns a snapshot report of the current state of the tasks managed by the given supervisor. This includes the latest
-sequence numbers as reported by Kinesis. Unlike the Kafka Indexing Service, stats about lag is not yet supported.
+`GET /druid/indexer/v1/supervisor/<supervisorId>/status` returns a snapshot report of the current state of the tasks 
+managed by the given supervisor. This includes the latest sequence numbers as reported by Kinesis. Unlike the Kafka
+Indexing Service, stats about lag are not yet supported.
+
+The status report also contains the supervisor's state and a list of recently thrown exceptions (reported as
+`recentErrors`, whose max size can be controlled using the `druid.supervisor.maxStoredExceptionEvents` configuration).
+There are two fields related to the supervisor's state - `state` and `detailedState`. The `state` field will always be
+one of a small number of generic states that are applicable to any type of supervisor, while the `detailedState` field
+will contain a more descriptive, implementation-specific state that may provide more insight into the supervisor's
+activities than the generic `state` field.
+
+The list of possible `state` values are: [`PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`, `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`]
+
+The list of `detailedState` values and their corresponding `state` mapping is as follows:
+
+|Detailed State|Corresponding State|Description|
+|--------------|-------------------|-----------|
+|UNHEALTHY_SUPERVISOR|UNHEALTHY_SUPERVISOR|The supervisor has encountered errors on the past `druid.supervisor.unhealthinessThreshold` iterations|
+|UNHEALTHY_TASKS|UNHEALTHY_TASKS|The last `druid.supervisor.taskUnhealthinessThreshold` tasks have all failed|
+|UNABLE_TO_CONNECT_TO_STREAM|UNHEALTHY_SUPERVISOR|The supervisor is encountering connectivity issues with Kinesis and has not successfully connected in the past|
+|LOST_CONTACT_WITH_STREAM|UNHEALTHY_SUPERVISOR|The supervisor is encountering connectivity issues with Kinesis but has successfully connected in the past|
+|PENDING (first iteration only)|PENDING|The supervisor has been initialized and hasn't started connecting to the stream|
+|CONNECTING_TO_STREAM (first iteration only)|RUNNING|The supervisor is trying to connect to the stream and update partition data|
+|DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is discovering already-running tasks|
+|CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating tasks and discovering state|
+|RUNNING|RUNNING|The supervisor has started tasks and is waiting for taskDuration to elapse|
+|SUSPENDED|SUSPENDED|The supervisor has been suspended|
+|STOPPING|STOPPING|The supervisor is stopping|
+
+On each iteration of the supervisor's run loop, the supervisor completes the following tasks in sequence:
+  1) Fetch the list of shards from Kinesis and determine the starting sequence number for each shard (either based on the
+  last processed sequence number if continuing, or starting from the beginning or ending of the stream if this is a new stream).
+  2) Discover any running indexing tasks that are writing to the supervisor's datasource and adopt them if they match
+  the supervisor's configuration, else signal them to stop.
+  3) Send a status request to each supervised task to update our view of the state of the tasks under our supervision.
+  4) Handle tasks that have exceeded `taskDuration` and should transition from the reading to publishing state.
+  5) Handle tasks that have finished publishing and signal redundant replica tasks to stop.
+  6) Handle tasks that have failed and clean up the supervisor's internal state.
+  7) Compare the list of healthy tasks to the requested `taskCount` and `replicas` configurations and create additional tasks if required.
+
+The `detailedState` field will show additional values (those marked with "first iteration only") the first time the
+supervisor executes this run loop after startup or after resuming from a suspension. This is intended to surface
+initialization-type issues, where the supervisor is unable to reach a stable state (perhaps because it can't connect to
+Kinesis, it can't read from the stream, or it can't communicate with existing tasks). Once the supervisor is stable -
+that is, once it has completed a full execution without encountering any issues - `detailedState` will show a `RUNNING`
+state until it is stopped, suspended, or hits a failure threshold and transitions to an unhealthy state.
 
 ### Updating Existing Supervisors
 
@@ -390,4 +436,4 @@ requires the user to manually provide the Kinesis Client Library on the classpat
 compatible with Apache projects.
 
 To enable this feature, add the `amazon-kinesis-client` (tested on version `1.9.2`) jar file ([link](https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-client/1.9.2)) under `dist/druid/extensions/druid-kinesis-indexing-service/`.
-Then when submitting a supervisor-spec, set `deaggregate` to true.
\ No newline at end of file
+Then when submitting a supervisor-spec, set `deaggregate` to true.
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 105afdf..c1033ae 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
@@ -77,6 +78,7 @@ public class MaterializedViewSupervisor implements Supervisor
   private final TaskMaster taskMaster;
   private final TaskStorage taskStorage;
   private final MaterializedViewTaskConfig config;
+  private final SupervisorStateManager stateManager;
   private final String dataSource;
   private final String supervisorId;
   private final int maxTaskCount;
@@ -93,7 +95,7 @@ public class MaterializedViewSupervisor implements Supervisor
   // In the missing intervals, baseDataSource has data but derivedDataSource does not, which means
   // data in these intervals of derivedDataSource needs to be rebuilt.
   private Set<Interval> missInterval = new HashSet<>();
-  
+
   public MaterializedViewSupervisor(
       TaskMaster taskMaster,
       TaskStorage taskStorage,
@@ -111,6 +113,7 @@ public class MaterializedViewSupervisor implements Supervisor
     this.metadataSupervisorManager = metadataSupervisorManager;
     this.config = config;
     this.spec = spec;
+    this.stateManager = new SupervisorStateManager(spec.getSupervisorStateManagerConfig(), spec.isSuspended());
     this.dataSource = spec.getDataSourceName();
     this.supervisorId = StringUtils.format("MaterializedViewSupervisor-%s", dataSource);
     this.maxTaskCount = spec.getContext().containsKey("maxTaskCount")
@@ -120,17 +123,17 @@ public class MaterializedViewSupervisor implements Supervisor
         ? Long.parseLong(String.valueOf(spec.getContext().get("minDataLagMs")))
         : DEFAULT_MIN_DATA_LAG_MS;
   }
-  
+
   @Override
-  public void start() 
+  public void start()
   {
     synchronized (stateLock) {
       Preconditions.checkState(!started, "already started");
-      
+
       DataSourceMetadata metadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource);
       if (null == metadata) {
         metadataStorageCoordinator.insertDataSourceMetadata(
-            dataSource, 
+            dataSource,
             new DerivativeDataSourceMetadata(spec.getBaseDataSource(), spec.getDimensions(), spec.getMetrics())
         );
       }
@@ -175,15 +178,22 @@ public class MaterializedViewSupervisor implements Supervisor
       }
     }
     catch (Exception e) {
+      stateManager.recordThrowableEvent(e);
       log.makeAlert(e, StringUtils.format("uncaught exception in %s.", supervisorId)).emit();
     }
+    finally {
+      stateManager.markRunFinished();
+    }
   }
 
   @Override
-  public void stop(boolean stopGracefully) 
+  public void stop(boolean stopGracefully)
   {
     synchronized (stateLock) {
       Preconditions.checkState(started, "not started");
+
+      stateManager.maybeSetState(SupervisorStateManager.BasicState.STOPPING);
+
       // stop all schedulers and threads
       if (stopGracefully) {
         synchronized (taskLock) {
@@ -214,7 +224,7 @@ public class MaterializedViewSupervisor implements Supervisor
   }
 
   @Override
-  public SupervisorReport getStatus() 
+  public SupervisorReport getStatus()
   {
     return new MaterializedViewSupervisorReport(
         dataSource,
@@ -223,11 +233,20 @@ public class MaterializedViewSupervisor implements Supervisor
         spec.getBaseDataSource(),
         spec.getDimensions(),
         spec.getMetrics(),
-        JodaUtils.condenseIntervals(missInterval)
+        JodaUtils.condenseIntervals(missInterval),
+        stateManager.isHealthy(),
+        stateManager.getSupervisorState().getBasicState(),
+        stateManager.getExceptionEvents()
     );
   }
 
   @Override
+  public Boolean isHealthy()
+  {
+    return stateManager.isHealthy();
+  }
+
+  @Override
   public void reset(DataSourceMetadata dataSourceMetadata)
   {
     if (dataSourceMetadata == null) {
@@ -293,7 +312,7 @@ public class MaterializedViewSupervisor implements Supervisor
       submitTasks(sortedToBuildVersion, baseSegments);
     }
   }
-  
+
   @VisibleForTesting
   Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> getRunningTasks()
   {
@@ -311,7 +330,7 @@ public class MaterializedViewSupervisor implements Supervisor
    *
    * @return the left part of Pair: interval -> version, and the right part: interval -> DataSegment list.
    *          Version and DataSegment list can be used to create HadoopIndexTask.
-   *          Derived datasource data in all these intervals need to be rebuilt. 
+   *          Derived datasource data in all these intervals need to be rebuilt.
    */
   @VisibleForTesting
   Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegments()
@@ -347,13 +366,13 @@ public class MaterializedViewSupervisor implements Supervisor
     // if some intervals are in running tasks and the versions are the same, remove it from toBuildInterval
     // if some intervals are in running tasks, but the versions are different, stop the task. 
     for (Interval interval : runningVersion.keySet()) {
-      if (toBuildInterval.containsKey(interval) 
+      if (toBuildInterval.containsKey(interval)
           && toBuildInterval.get(interval).equals(runningVersion.get(interval))
           ) {
         toBuildInterval.remove(interval);
 
       } else if (
-          toBuildInterval.containsKey(interval) 
+          toBuildInterval.containsKey(interval)
           && !toBuildInterval.get(interval).equals(runningVersion.get(interval))
       ) {
         if (taskMaster.getTaskQueue().isPresent()) {
@@ -374,7 +393,7 @@ public class MaterializedViewSupervisor implements Supervisor
   }
 
   private void submitTasks(
-      SortedMap<Interval, String> sortedToBuildVersion, 
+      SortedMap<Interval, String> sortedToBuildVersion,
       Map<Interval, List<DataSegment>> baseSegments
   )
   {
@@ -397,7 +416,7 @@ public class MaterializedViewSupervisor implements Supervisor
       }
     }
   }
-  
+
   private Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> getVersionAndBaseSegments(
       List<DataSegment> snapshot
   )
@@ -412,7 +431,7 @@ public class MaterializedViewSupervisor implements Supervisor
     }
     return new Pair<>(versions, segments);
   }
-  
+
   private Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> getMaxCreateDateAndBaseSegments(
       List<Pair<DataSegment, String>> snapshot
   )
@@ -432,9 +451,9 @@ public class MaterializedViewSupervisor implements Supervisor
         continue;
       }
       maxCreatedDate.put(
-          interval, 
+          interval,
           DateTimes.max(
-              DateTimes.of(createDate), 
+              DateTimes.of(createDate),
               DateTimes.of(maxCreatedDate.getOrDefault(interval, DateTimes.MIN.toString()))
           ).toString()
       );
@@ -457,8 +476,8 @@ public class MaterializedViewSupervisor implements Supervisor
   {
     return minDataLagMs <= (maxInterval.getStartMillis() - target.getStartMillis());
   }
-  
-  private void clearTasks() 
+
+  private void clearTasks()
   {
     for (HadoopIndexTask task : runningTasks.values()) {
       if (taskMaster.getTaskQueue().isPresent()) {
@@ -468,7 +487,7 @@ public class MaterializedViewSupervisor implements Supervisor
     runningTasks.clear();
     runningVersion.clear();
   }
-  
+
   private void clearSegments()
   {
     log.info("Clear all metadata of dataSource %s", dataSource);
@@ -476,7 +495,7 @@ public class MaterializedViewSupervisor implements Supervisor
     segmentManager.removeDataSource(dataSource);
     metadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
   }
-  
+
   private void commitDataSourceMetadata(DataSourceMetadata dataSourceMetadata)
   {
     if (!metadataStorageCoordinator.insertDataSourceMetadata(dataSource, dataSourceMetadata)) {
@@ -485,7 +504,7 @@ public class MaterializedViewSupervisor implements Supervisor
             dataSource,
             dataSourceMetadata
         );
-      } 
+      }
       catch (IOException e) {
         throw new RuntimeException(e);
       }
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
index f05f8e0..13e51da 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
@@ -19,17 +19,18 @@
 
 package org.apache.druid.indexing.materializedview;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import java.util.List;
 import java.util.Set;
 
-public class MaterializedViewSupervisorReport extends SupervisorReport 
+public class MaterializedViewSupervisorReport extends SupervisorReport
 {
-
   public MaterializedViewSupervisorReport(
       String dataSource,
       DateTime generationTime,
@@ -37,16 +38,26 @@ public class MaterializedViewSupervisorReport extends SupervisorReport
       String baseDataSource,
       Set<String> dimensions,
       Set<String> metrics,
-      List<Interval> missTimeline
+      List<Interval> missTimeline,
+      boolean healthy,
+      SupervisorStateManager.State state,
+      List<SupervisorStateManager.ExceptionEvent> recentErrors
   )
   {
-    super(dataSource, generationTime, "{" +
-        "dataSource='" + dataSource + '\'' +
-        ", baseDataSource='" + baseDataSource + '\'' +
-        ", suspended='" + suspended + "\'" +
-        ", dimensions=" + dimensions +
-        ", metrics=" + metrics +
-        ", missTimeline" + Sets.newHashSet(missTimeline) +
-        "}");
+    super(
+        dataSource,
+        generationTime,
+        ImmutableMap.builder()
+                    .put("dataSource", dataSource)
+                    .put("baseDataSource", baseDataSource)
+                    .put("suspended", suspended)
+                    .put("dimensions", dimensions)
+                    .put("metrics", metrics)
+                    .put("missTimeline", Sets.newHashSet(missTimeline))
+                    .put("healthy", healthy)
+                    .put("state", state)
+                    .put("recentErrors", recentErrors)
+                    .build()
+    );
   }
 }
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
index 02f07a5..4258fc9 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
@@ -38,6 +38,7 @@ import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -58,7 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class MaterializedViewSupervisorSpec implements SupervisorSpec 
+public class MaterializedViewSupervisorSpec implements SupervisorSpec
 {
   private static final String TASK_PREFIX = "index_materialized_view";
   private final String baseDataSource;
@@ -81,8 +82,9 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
   private final MaterializedViewTaskConfig config;
   private final AuthorizerMapper authorizerMapper;
   private final ChatHandlerProvider chatHandlerProvider;
+  private final SupervisorStateManagerConfig supervisorStateManagerConfig;
   private final boolean suspended;
-  
+
   public MaterializedViewSupervisorSpec(
       @JsonProperty("baseDataSource") String baseDataSource,
       @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@@ -102,31 +104,35 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
       @JacksonInject IndexerMetadataStorageCoordinator metadataStorageCoordinator,
       @JacksonInject MaterializedViewTaskConfig config,
       @JacksonInject AuthorizerMapper authorizerMapper,
-      @JacksonInject ChatHandlerProvider chatHandlerProvider
+      @JacksonInject ChatHandlerProvider chatHandlerProvider,
+      @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
   )
   {
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(baseDataSource), "baseDataSource cannot be null or empty. Please provide a baseDataSource.");
+    Preconditions.checkArgument(
+        !Strings.isNullOrEmpty(baseDataSource),
+        "baseDataSource cannot be null or empty. Please provide a baseDataSource."
+    );
     this.baseDataSource = baseDataSource;
 
     this.dimensionsSpec = Preconditions.checkNotNull(
-                            dimensionsSpec, 
-                            "dimensionsSpec cannot be null. Please provide a dimensionsSpec"
-                          );
+        dimensionsSpec,
+        "dimensionsSpec cannot be null. Please provide a dimensionsSpec"
+    );
     this.aggregators = Preconditions.checkNotNull(
-                         aggregators, 
-                         "metricsSpec cannot be null. Please provide a metricsSpec"
-                       );
+        aggregators,
+        "metricsSpec cannot be null. Please provide a metricsSpec"
+    );
     this.tuningConfig = Preconditions.checkNotNull(
-                          tuningConfig, 
-                          "tuningConfig cannot be null. Please provide tuningConfig"
-                        );
-    
-    this.dataSourceName = dataSourceName == null 
+        tuningConfig,
+        "tuningConfig cannot be null. Please provide tuningConfig"
+    );
+
+    this.dataSourceName = dataSourceName == null
                           ? StringUtils.format(
-                              "%s-%s", 
-                              baseDataSource, 
-                              DigestUtils.sha1Hex(dimensionsSpec.toString()).substring(0, 8)
-                            ) 
+        "%s-%s",
+        baseDataSource,
+        DigestUtils.sha1Hex(dimensionsSpec.toString()).substring(0, 8)
+    )
                           : dataSourceName;
     this.hadoopCoordinates = hadoopCoordinates;
     this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
@@ -141,6 +147,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
     this.authorizerMapper = authorizerMapper;
     this.chatHandlerProvider = chatHandlerProvider;
     this.config = config;
+    this.supervisorStateManagerConfig = supervisorStateManagerConfig;
     this.suspended = suspended != null ? suspended : false;
 
     this.metrics = new HashSet<>();
@@ -152,11 +159,11 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
       dimensions.add(schema.getName());
     }
   }
-  
+
   public HadoopIndexTask createTask(Interval interval, String version, List<DataSegment> segments)
   {
     String taskId = StringUtils.format("%s_%s_%s", TASK_PREFIX, dataSourceName, DateTimes.nowUtc());
-    
+
     // generate parser
     Map<String, Object> parseSpec = new HashMap<>();
     parseSpec.put("format", "timeAndDims");
@@ -164,7 +171,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
     Map<String, Object> parser = new HashMap<>();
     parser.put("type", "map");
     parser.put("parseSpec", parseSpec);
-    
+
     //generate HadoopTuningConfig
     HadoopTuningConfig tuningConfigForTask = new HadoopTuningConfig(
         tuningConfig.getWorkingPath(),
@@ -191,7 +198,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
         tuningConfig.getMaxParseExceptions(),
         tuningConfig.isUseYarnRMJobStatusFallback()
     );
-    
+
     // generate granularity
     ArbitraryGranularitySpec granularitySpec = new ArbitraryGranularitySpec(
         Granularities.NONE,
@@ -207,7 +214,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
         TransformSpec.NONE,
         objectMapper
     );
-    
+
     // generate DatasourceIngestionSpec
     DatasourceIngestionSpec datasourceIngestionSpec = new DatasourceIngestionSpec(
         baseDataSource,
@@ -226,10 +233,10 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
     inputSpec.put("type", "dataSource");
     inputSpec.put("ingestionSpec", datasourceIngestionSpec);
     HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(inputSpec, null, null);
-    
+
     // generate HadoopIngestionSpec
     HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, tuningConfigForTask);
-    
+
     // generate HadoopIndexTask
     HadoopIndexTask task = new HadoopIndexTask(
         taskId,
@@ -250,24 +257,24 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
   {
     return dimensions;
   }
-  
+
   public Set<String> getMetrics()
   {
     return metrics;
   }
-  
+
   @JsonProperty("baseDataSource")
   public String getBaseDataSource()
   {
     return baseDataSource;
   }
-  
+
   @JsonProperty("dimensionsSpec")
   public DimensionsSpec getDimensionsSpec()
   {
     return dimensionsSpec;
   }
-  
+
   @JsonProperty("metricsSpec")
   public AggregatorFactory[] getMetricsSpec()
   {
@@ -279,33 +286,33 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
   {
     return tuningConfig;
   }
-  
+
   @JsonProperty("dataSource")
   public String getDataSourceName()
   {
     return dataSourceName;
   }
-  
+
   @JsonProperty("hadoopCoordinates")
   public String getHadoopCoordinates()
   {
     return hadoopCoordinates;
   }
-  
+
   @JsonProperty("hadoopDependencyCoordinates")
   public List<String> getSadoopDependencyCoordinates()
   {
     return hadoopDependencyCoordinates;
   }
-  
+
   @JsonProperty("classpathPrefix")
   public String getClasspathPrefix()
   {
     return classpathPrefix;
   }
-  
+
   @JsonProperty("context")
-  public Map<String, Object> getContext() 
+  public Map<String, Object> getContext()
   {
     return context;
   }
@@ -318,13 +325,13 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
   }
 
   @Override
-  public String getId() 
+  public String getId()
   {
     return StringUtils.format("MaterializedViewSupervisor-%s", dataSourceName);
   }
 
   @Override
-  public Supervisor createSupervisor() 
+  public Supervisor createSupervisor()
   {
     return new MaterializedViewSupervisor(
         taskMaster,
@@ -365,7 +372,8 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
         metadataStorageCoordinator,
         config,
         authorizerMapper,
-        chatHandlerProvider
+        chatHandlerProvider,
+        supervisorStateManagerConfig
     );
   }
 
@@ -391,17 +399,23 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
         metadataStorageCoordinator,
         config,
         authorizerMapper,
-        chatHandlerProvider
+        chatHandlerProvider,
+        supervisorStateManagerConfig
     );
   }
 
+  public SupervisorStateManagerConfig getSupervisorStateManagerConfig()
+  {
+    return supervisorStateManagerConfig;
+  }
+
   @Override
   public String toString()
   {
     return "MaterializedViewSupervisorSpec{" +
-        "baseDataSource=" + baseDataSource +
-        ", dimensions=" + dimensions +
-        ", metrics=" + metrics +
-        '}';
+           "baseDataSource=" + baseDataSource +
+           ", dimensions=" + dimensions +
+           ", metrics=" + metrics +
+           '}';
   }
 }
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
index 3e6e1a6..46728e2 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.indexer.HadoopTuningConfig;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.metadata.SQLMetadataSegmentManager;
@@ -74,6 +75,7 @@ public class MaterializedViewSupervisorSpecTest
             .addValue(MaterializedViewTaskConfig.class, new MaterializedViewTaskConfig())
             .addValue(AuthorizerMapper.class, createMock(AuthorizerMapper.class))
             .addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
+            .addValue(SupervisorStateManagerConfig.class, new SupervisorStateManagerConfig())
     );
   }
 
@@ -143,7 +145,8 @@ public class MaterializedViewSupervisorSpecTest
         null,
         new MaterializedViewTaskConfig(),
         createMock(AuthorizerMapper.class),
-        new NoopChatHandlerProvider()
+        new NoopChatHandlerProvider(),
+        new SupervisorStateManagerConfig()
     );
     MaterializedViewSupervisorSpec spec = objectMapper.readValue(supervisorStr, MaterializedViewSupervisorSpec.class);
     Assert.assertEquals(expected.getBaseDataSource(), spec.getBaseDataSource());
@@ -239,7 +242,8 @@ public class MaterializedViewSupervisorSpecTest
         null,
         new MaterializedViewTaskConfig(),
         createMock(AuthorizerMapper.class),
-        new NoopChatHandlerProvider()
+        new NoopChatHandlerProvider(),
+        new SupervisorStateManagerConfig()
     );
   }
 
@@ -284,7 +288,8 @@ public class MaterializedViewSupervisorSpecTest
         null,
         new MaterializedViewTaskConfig(),
         createMock(AuthorizerMapper.class),
-        new NoopChatHandlerProvider()
+        new NoopChatHandlerProvider(),
+        new SupervisorStateManagerConfig()
     );
   }
 }
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index 1bf1c39..3b9061d 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskQueue;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
@@ -125,7 +126,8 @@ public class MaterializedViewSupervisorTest
         indexerMetadataStorageCoordinator,
         new MaterializedViewTaskConfig(),
         createMock(AuthorizerMapper.class),
-        createMock(ChatHandlerProvider.class)
+        createMock(ChatHandlerProvider.class),
+        new SupervisorStateManagerConfig()
     );
     supervisor = (MaterializedViewSupervisor) spec.createSupervisor();
   }
@@ -282,7 +284,8 @@ public class MaterializedViewSupervisorTest
         indexerMetadataStorageCoordinator,
         new MaterializedViewTaskConfig(),
         createMock(AuthorizerMapper.class),
-        createMock(ChatHandlerProvider.class)
+        createMock(ChatHandlerProvider.class),
+        new SupervisorStateManagerConfig()
     );
     MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) suspended.createSupervisor();
 
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index fcd1673..e40f77a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.metadata.PasswordProvider;
@@ -41,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
 public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
@@ -63,44 +65,46 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
   @Override
   public void assign(Set<StreamPartition<Integer>> streamPartitions)
   {
-    consumer.assign(streamPartitions
-                        .stream()
-                        .map(x -> new TopicPartition(x.getStream(), x.getPartitionId()))
-                        .collect(Collectors.toSet()));
+    wrapExceptions(() -> consumer.assign(streamPartitions
+                                             .stream()
+                                             .map(x -> new TopicPartition(x.getStream(), x.getPartitionId()))
+                                             .collect(Collectors.toSet())));
   }
 
   @Override
   public void seek(StreamPartition<Integer> partition, Long sequenceNumber)
   {
-    consumer.seek(new TopicPartition(partition.getStream(), partition.getPartitionId()), sequenceNumber);
+    wrapExceptions(() -> consumer.seek(
+        new TopicPartition(partition.getStream(), partition.getPartitionId()),
+        sequenceNumber
+    ));
   }
 
   @Override
   public void seekToEarliest(Set<StreamPartition<Integer>> partitions)
   {
-    consumer.seekToBeginning(partitions
-                                 .stream()
-                                 .map(e -> new TopicPartition(e.getStream(), e.getPartitionId()))
-                                 .collect(Collectors.toList()));
+    wrapExceptions(() -> consumer.seekToBeginning(partitions
+                                                      .stream()
+                                                      .map(e -> new TopicPartition(e.getStream(), e.getPartitionId()))
+                                                      .collect(Collectors.toList())));
   }
 
   @Override
   public void seekToLatest(Set<StreamPartition<Integer>> partitions)
   {
-    consumer.seekToEnd(partitions
-                           .stream()
-                           .map(e -> new TopicPartition(e.getStream(), e.getPartitionId()))
-                           .collect(Collectors.toList()));
+    wrapExceptions(() -> consumer.seekToEnd(partitions
+                                                .stream()
+                                                .map(e -> new TopicPartition(e.getStream(), e.getPartitionId()))
+                                                .collect(Collectors.toList())));
   }
 
   @Override
   public Set<StreamPartition<Integer>> getAssignment()
   {
-    Set<TopicPartition> topicPartitions = consumer.assignment();
-    return topicPartitions
-        .stream()
-        .map(e -> new StreamPartition<>(e.topic(), e.partition()))
-        .collect(Collectors.toSet());
+    return wrapExceptions(() -> consumer.assignment()
+                                        .stream()
+                                        .map(e -> new StreamPartition<>(e.topic(), e.partition()))
+                                        .collect(Collectors.toSet()));
   }
 
   @Nonnull
@@ -122,9 +126,9 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
   @Override
   public Long getLatestSequenceNumber(StreamPartition<Integer> partition)
   {
-    Long currPos = consumer.position(new TopicPartition(partition.getStream(), partition.getPartitionId()));
+    Long currPos = getPosition(partition);
     seekToLatest(Collections.singleton(partition));
-    Long nextPos = consumer.position(new TopicPartition(partition.getStream(), partition.getPartitionId()));
+    Long nextPos = getPosition(partition);
     seek(partition, currPos);
     return nextPos;
   }
@@ -132,9 +136,9 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
   @Override
   public Long getEarliestSequenceNumber(StreamPartition<Integer> partition)
   {
-    Long currPos = consumer.position(new TopicPartition(partition.getStream(), partition.getPartitionId()));
+    Long currPos = getPosition(partition);
     seekToEarliest(Collections.singleton(partition));
-    Long nextPos = consumer.position(new TopicPartition(partition.getStream(), partition.getPartitionId()));
+    Long nextPos = getPosition(partition);
     seek(partition, currPos);
     return nextPos;
   }
@@ -142,17 +146,22 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
   @Override
   public Long getPosition(StreamPartition<Integer> partition)
   {
-    return consumer.position(new TopicPartition(partition.getStream(), partition.getPartitionId()));
+    return wrapExceptions(() -> consumer.position(new TopicPartition(
+        partition.getStream(),
+        partition.getPartitionId()
+    )));
   }
 
   @Override
   public Set<Integer> getPartitionIds(String stream)
   {
-    List<PartitionInfo> partitions = consumer.partitionsFor(stream);
-    if (partitions == null) {
-      throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", stream);
-    }
-    return partitions.stream().map(PartitionInfo::partition).collect(Collectors.toSet());
+    return wrapExceptions(() -> {
+      List<PartitionInfo> partitions = consumer.partitionsFor(stream);
+      if (partitions == null) {
+        throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", stream);
+      }
+      return partitions.stream().map(PartitionInfo::partition).collect(Collectors.toSet());
+    });
   }
 
   @Override
@@ -205,4 +214,21 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
     }
   }
 
+  private static <T> T wrapExceptions(Callable<T> callable)
+  {
+    try {
+      return callable.call();
+    }
+    catch (Exception e) {
+      throw new StreamException(e);
+    }
+  }
+
+  private static void wrapExceptions(Runnable runnable)
+  {
+    wrapExceptions(() -> {
+      runnable.run();
+      return null;
+    });
+  }
 }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index b2924ea..5d419a4 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -191,7 +191,11 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
         includeOffsets ? partitionLag : null,
         includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null,
         includeOffsets ? sequenceLastUpdated : null,
-        spec.isSuspended()
+        spec.isSuspended(),
+        stateManager.isHealthy(),
+        stateManager.getSupervisorState().getBasicState(),
+        stateManager.getSupervisorState(),
+        stateManager.getExceptionEvents()
     );
   }
 
@@ -381,4 +385,10 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
   {
     return spec.getIoConfig();
   }
+
+  @Override
+  public Boolean isHealthy()
+  {
+    return stateManager.isHealthy();
+  }
 }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
index d5f4efa..768468c 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
@@ -19,15 +19,16 @@
 
 package org.apache.druid.indexing.kafka.supervisor;
 
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Map;
 
 public class KafkaSupervisorReportPayload extends SeekableStreamSupervisorReportPayload<Integer, Long>
 {
-
   public KafkaSupervisorReportPayload(
       String dataSource,
       String topic,
@@ -38,7 +39,11 @@ public class KafkaSupervisorReportPayload extends SeekableStreamSupervisorReport
       @Nullable Map<Integer, Long> minimumLag,
       @Nullable Long aggregateLag,
       @Nullable DateTime offsetsLastUpdated,
-      boolean suspended
+      boolean suspended,
+      boolean healthy,
+      SupervisorStateManager.State state,
+      SupervisorStateManager.State detailedState,
+      List<SupervisorStateManager.ExceptionEvent> recentErrors
   )
   {
     super(
@@ -51,11 +56,14 @@ public class KafkaSupervisorReportPayload extends SeekableStreamSupervisorReport
         minimumLag,
         aggregateLag,
         offsetsLastUpdated,
-        suspended
+        suspended,
+        healthy,
+        state,
+        detailedState,
+        recentErrors
     );
   }
 
-
   @Override
   public String toString()
   {
@@ -71,8 +79,11 @@ public class KafkaSupervisorReportPayload extends SeekableStreamSupervisorReport
            (getMinimumLag() != null ? ", minimumLag=" + getMinimumLag() : "") +
            (getAggregateLag() != null ? ", aggregateLag=" + getAggregateLag() : "") +
            (getOffsetsLastUpdated() != null ? ", sequenceLastUpdated=" + getOffsetsLastUpdated() : "") +
-           ", suspended=" + getSuspended() +
+           ", suspended=" + isSuspended() +
+           ", healthy=" + isHealthy() +
+           ", state=" + getState() +
+           ", detailedState=" + getDetailedState() +
+           ", recentErrors=" + getRecentErrors() +
            '}';
   }
-
 }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 33ad3e1..ef6259f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -39,7 +40,6 @@ import java.util.Map;
 
 public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
 {
-
   @JsonCreator
   public KafkaSupervisorSpec(
       @JsonProperty("dataSchema") DataSchema dataSchema,
@@ -54,7 +54,8 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
       @JacksonInject @Json ObjectMapper mapper,
       @JacksonInject ServiceEmitter emitter,
       @JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig,
-      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+      @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
   )
   {
     super(
@@ -96,7 +97,8 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
         mapper,
         emitter,
         monitorSchedulerConfig,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig
     );
   }
 
@@ -144,7 +146,8 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
         mapper,
         emitter,
         monitorSchedulerConfig,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig
     );
   }
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 783af47..6c42b3b 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -831,15 +831,8 @@ public class KafkaIndexTaskTest
     // as soon as any segment has more than one record, incremental publishing should happen
     maxRowsPerSegment = 2;
 
-    // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      kafkaProducer.initTransactions();
-      kafkaProducer.beginTransaction();
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-      kafkaProducer.commitTransaction();
-    }
+    insertData();
+
     Map<String, Object> consumerProps = kafkaServer.consumerProperties();
     consumerProps.put("max.poll.records", "1");
 
@@ -2301,15 +2294,7 @@ public class KafkaIndexTaskTest
     maxRowsPerSegment = Integer.MAX_VALUE;
     maxTotalRows = null;
 
-    // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      kafkaProducer.initTransactions();
-      kafkaProducer.beginTransaction();
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-      kafkaProducer.commitTransaction();
-    }
+    insertData();
 
     Map<String, Object> consumerProps = kafkaServer.consumerProperties();
     consumerProps.put("max.poll.records", "1");
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 5820047..dce3766 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -160,6 +160,7 @@ public class KafkaSamplerSpecTest
         null,
         null,
         null,
+        null,
         null
     );
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index b4c6dfd..1506d98 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
@@ -56,6 +57,7 @@ public class KafkaSupervisorSpecTest
             .addValue(ServiceEmitter.class, new NoopServiceEmitter())
             .addValue(DruidMonitorSchedulerConfig.class, null)
             .addValue(RowIngestionMetersFactory.class, null)
+            .addValue(SupervisorStateManagerConfig.class, null)
             .addValue(ExprMacroTable.class.getName(), LookupEnabledTestExprMacroTable.INSTANCE)
     );
     mapper.registerModules((Iterable<Module>) new KafkaIndexTaskModule().getJacksonModules());
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 8686fc9..2eff41c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -61,10 +61,13 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
 import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
@@ -144,7 +147,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
   private final int numThreads;
 
-  private KafkaSupervisor supervisor;
+  private TestableKafkaSupervisor supervisor;
   private KafkaSupervisorTuningConfig tuningConfig;
   private TaskStorage taskStorage;
   private TaskMaster taskMaster;
@@ -155,6 +158,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   private String topic;
   private RowIngestionMetersFactory rowIngestionMetersFactory;
   private ExceptionCapturingServiceEmitter serviceEmitter;
+  private SupervisorStateManagerConfig supervisorConfig;
 
   private static String getTopic()
   {
@@ -237,6 +241,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
     serviceEmitter = new ExceptionCapturingServiceEmitter();
     EmittingLogger.registerEmitter(serviceEmitter);
+    supervisorConfig = new SupervisorStateManagerConfig();
   }
 
   @After
@@ -616,7 +621,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     );
   }
 
-  @Test(expected = ISE.class)
+  @Test
   public void testBadMetadataOffsets() throws Exception
   {
     supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@@ -637,6 +642,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     supervisor.start();
     supervisor.runInternal();
+
+    Assert.assertEquals(
+        "org.apache.druid.java.util.common.ISE",
+        supervisor.getStateManager().getExceptionEvents().get(0).getExceptionClass()
+    );
   }
 
   @Test
@@ -1252,12 +1262,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
     KafkaSupervisorReportPayload payload = report.getPayload();
 
     Assert.assertEquals(DATASOURCE, payload.getDataSource());
-    Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
-    Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions());
-    Assert.assertEquals(1, (int) payload.getReplicas());
+    Assert.assertEquals(3600L, payload.getDurationSeconds());
+    Assert.assertEquals(NUM_PARTITIONS, payload.getPartitions());
+    Assert.assertEquals(1, payload.getReplicas());
     Assert.assertEquals(topic, payload.getStream());
     Assert.assertEquals(0, payload.getActiveTasks().size());
     Assert.assertEquals(1, payload.getPublishingTasks().size());
+    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
+    Assert.assertEquals(0, payload.getRecentErrors().size());
 
     TaskReportData publishingReport = payload.getPublishingTasks().get(0);
 
@@ -1358,12 +1370,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
     KafkaSupervisorReportPayload payload = report.getPayload();
 
     Assert.assertEquals(DATASOURCE, payload.getDataSource());
-    Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
-    Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions());
-    Assert.assertEquals(1, (int) payload.getReplicas());
+    Assert.assertEquals(3600L, payload.getDurationSeconds());
+    Assert.assertEquals(NUM_PARTITIONS, payload.getPartitions());
+    Assert.assertEquals(1, payload.getReplicas());
     Assert.assertEquals(topic, payload.getStream());
     Assert.assertEquals(0, payload.getActiveTasks().size());
     Assert.assertEquals(1, payload.getPublishingTasks().size());
+    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
+    Assert.assertEquals(0, payload.getRecentErrors().size());
 
     TaskReportData publishingReport = payload.getPublishingTasks().get(0);
 
@@ -1495,12 +1509,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
     KafkaSupervisorReportPayload payload = report.getPayload();
 
     Assert.assertEquals(DATASOURCE, payload.getDataSource());
-    Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
-    Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions());
-    Assert.assertEquals(1, (int) payload.getReplicas());
+    Assert.assertEquals(3600L, payload.getDurationSeconds());
+    Assert.assertEquals(NUM_PARTITIONS, payload.getPartitions());
+    Assert.assertEquals(1, payload.getReplicas());
     Assert.assertEquals(topic, payload.getStream());
     Assert.assertEquals(1, payload.getActiveTasks().size());
     Assert.assertEquals(1, payload.getPublishingTasks().size());
+    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
+    Assert.assertEquals(0, payload.getRecentErrors().size());
 
     TaskReportData activeReport = payload.getActiveTasks().get(0);
     TaskReportData publishingReport = payload.getPublishingTasks().get(0);
@@ -2107,6 +2123,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final DateTime startTime = DateTimes.nowUtc();
     supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null);
+    supervisor.getStateManager().markRunFinished();
+
     //not adding any events
     Task id1 = createKafkaIndexTask(
         "id1",
@@ -2203,6 +2221,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
       throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
   {
     supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null);
+    supervisor.getStateManager().markRunFinished();
+
     //not adding any events
     final Task id1 = createKafkaIndexTask(
         "id1",
@@ -2420,6 +2440,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
       throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
   {
     supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null);
+    supervisor.getStateManager().markRunFinished();
+
     //not adding any events
     final Task id1 = createKafkaIndexTask(
         "id1",
@@ -3126,7 +3148,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     }
   }
 
-  private KafkaSupervisor getTestableSupervisor(
+  private TestableKafkaSupervisor getTestableSupervisor(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -3147,7 +3169,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     );
   }
 
-  private KafkaSupervisor getTestableSupervisor(
+  private TestableKafkaSupervisor getTestableSupervisor(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -3216,7 +3238,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
             objectMapper,
             new NoopServiceEmitter(),
             new DruidMonitorSchedulerConfig(),
-            rowIngestionMetersFactory
+            rowIngestionMetersFactory,
+            new SupervisorStateManagerConfig()
         ),
         rowIngestionMetersFactory
     );
@@ -3225,7 +3248,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   /**
    * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent()
    */
-  private KafkaSupervisor getTestableSupervisorCustomIsTaskCurrent(
+  private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -3295,7 +3318,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
             objectMapper,
             new NoopServiceEmitter(),
             new DruidMonitorSchedulerConfig(),
-            rowIngestionMetersFactory
+            rowIngestionMetersFactory,
+            supervisorConfig
         ),
         rowIngestionMetersFactory,
         isTaskCurrentReturn
@@ -3378,7 +3402,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
             objectMapper,
             new NoopServiceEmitter(),
             new DruidMonitorSchedulerConfig(),
-            rowIngestionMetersFactory
+            rowIngestionMetersFactory,
+            supervisorConfig
         ),
         rowIngestionMetersFactory
     );
@@ -3565,6 +3590,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
       final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
       return StringUtils.format("sequenceName-%d", groupId);
     }
+
+    private SeekableStreamSupervisorStateManager getStateManager()
+    {
+      return stateManager;
+    }
   }
 
   private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index a05eade..a55a572 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -45,6 +45,7 @@ import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.common.aws.AWSCredentialsUtils;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -67,6 +68,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
@@ -579,12 +581,11 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
   @Override
   public Set<String> getPartitionIds(String stream)
   {
-    checkIfClosed();
-    return kinesis.describeStream(stream)
-                  .getStreamDescription()
-                  .getShards()
-                  .stream()
-                  .map(Shard::getShardId).collect(Collectors.toSet());
+    return wrapExceptions(() -> kinesis.describeStream(stream)
+                                       .getStreamDescription()
+                                       .getShards()
+                                       .stream()
+                                       .map(Shard::getShardId).collect(Collectors.toSet()));
   }
 
   @Override
@@ -624,12 +625,12 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
         sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()
     );
 
-    resource.shardIterator = kinesis.getShardIterator(
+    resource.shardIterator = wrapExceptions(() -> kinesis.getShardIterator(
         partition.getStream(),
         partition.getPartitionId(),
         iteratorEnum.toString(),
         sequenceNumber
-    ).getShardIterator();
+    ).getShardIterator());
 
     checkPartitionsStarted = true;
   }
@@ -655,10 +656,10 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
 
     // filter records in buffer and only retain ones whose partition was not seeked
     BlockingQueue<OrderedPartitionableRecord<String, String>> newQ = new LinkedBlockingQueue<>(recordBufferSize);
-    records
-        .stream()
-        .filter(x -> !partitions.contains(x.getStreamPartition()))
-        .forEachOrdered(newQ::offer);
+
+    records.stream()
+           .filter(x -> !partitions.contains(x.getStreamPartition()))
+           .forEachOrdered(newQ::offer);
 
     records = newQ;
 
@@ -670,20 +671,11 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
   @Nullable
   private String getSequenceNumberInternal(StreamPartition<String> partition, ShardIteratorType iteratorEnum)
   {
-
-    String shardIterator = null;
-    try {
-      shardIterator = kinesis.getShardIterator(
-          partition.getStream(),
-          partition.getPartitionId(),
-          iteratorEnum.toString()
-      ).getShardIterator();
-    }
-    catch (ResourceNotFoundException e) {
-      log.warn(e, "Caught ResourceNotFoundException while getting shardIterator");
-    }
-
-    return getSequenceNumberInternal(partition, shardIterator);
+    return wrapExceptions(() -> getSequenceNumberInternal(
+        partition,
+        kinesis.getShardIterator(partition.getStream(), partition.getPartitionId(), iteratorEnum.toString())
+               .getShardIterator()
+    ));
   }
 
   @Nullable
@@ -774,6 +766,16 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
     }
   }
 
+  private static <T> T wrapExceptions(Callable<T> callable)
+  {
+    try {
+      return callable.call();
+    }
+    catch (Exception e) {
+      throw new StreamException(e);
+    }
+  }
+
   @VisibleForTesting
   public int bufferSize()
   {
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 870d7ec..39619a2 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -247,7 +247,11 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
         numPartitions,
         ioConfig.getReplicas(),
         ioConfig.getTaskDuration().getMillis() / 1000,
-        spec.isSuspended()
+        spec.isSuspended(),
+        stateManager.isHealthy(),
+        stateManager.getSupervisorState().getBasicState(),
+        stateManager.getSupervisorState(),
+        stateManager.getExceptionEvents()
     );
   }
 
@@ -312,4 +316,10 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
   {
     return true;
   }
+
+  @Override
+  public Boolean isHealthy()
+  {
+    return stateManager.isHealthy();
+  }
 }
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java
index fb08337..9a4ee86 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java
@@ -19,9 +19,11 @@
 
 package org.apache.druid.indexing.kinesis.supervisor;
 
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
 
 import java.util.Collections;
+import java.util.List;
 
 public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorReportPayload<String, String>
 {
@@ -31,7 +33,11 @@ public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorRepo
       Integer partitions,
       Integer replicas,
       Long durationSeconds,
-      boolean suspended
+      boolean suspended,
+      boolean healthy,
+      SupervisorStateManager.State state,
+      SupervisorStateManager.State detailedState,
+      List<SupervisorStateManager.ExceptionEvent> recentErrors
   )
   {
     super(
@@ -44,7 +50,11 @@ public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorRepo
         Collections.emptyMap(),
         null,
         null,
-        suspended
+        suspended,
+        healthy,
+        state,
+        detailedState,
+        recentErrors
     );
   }
 
@@ -59,7 +69,11 @@ public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorRepo
            ", durationSeconds=" + getDurationSeconds() +
            ", active=" + getActiveTasks() +
            ", publishing=" + getPublishingTasks() +
-           ", suspended=" + getSuspended() +
+           ", suspended=" + isSuspended() +
+           ", healthy=" + isHealthy() +
+           ", state=" + getState() +
+           ", detailedState=" + getDetailedState() +
+           ", recentErrors=" + getRecentErrors() +
            '}';
   }
 
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index 479851f..ec72c7d 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -58,7 +59,8 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
       @JacksonInject ServiceEmitter emitter,
       @JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig,
       @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
-      @JacksonInject @Named("kinesis") AWSCredentialsConfig awsCredentialsConfig
+      @JacksonInject @Named("kinesis") AWSCredentialsConfig awsCredentialsConfig,
+      @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
   )
   {
     super(
@@ -106,7 +108,8 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
         mapper,
         emitter,
         monitorSchedulerConfig,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig
     );
     this.awsCredentialsConfig = awsCredentialsConfig;
   }
@@ -170,7 +173,8 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
         emitter,
         monitorSchedulerConfig,
         rowIngestionMetersFactory,
-        awsCredentialsConfig
+        awsCredentialsConfig,
+        supervisorStateManagerConfig
     );
   }
 }
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
index 62b1226..95f6b4d 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -179,6 +179,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
         null,
         null,
         null,
+        null,
         null
     );
 
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index a06c303..c22e57d 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.kinesis.supervisor;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -57,11 +56,14 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
 import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
@@ -102,9 +104,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeoutException;
 
 import static org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
 import static org.easymock.EasyMock.anyObject;
@@ -121,16 +121,16 @@ public class KinesisSupervisorTest extends EasyMockSupport
   private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
   private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
   private static final String stream = "stream";
-  private static String shardId1 = "1";
-  private static String shardId0 = "0";
-  private static StreamPartition<String> shard1Partition = StreamPartition.of(stream, shardId1);
-  private static StreamPartition<String> shard0Partition = StreamPartition.of(stream, shardId0);
+  private static final String shardId1 = "1";
+  private static final String shardId0 = "0";
+  private static final StreamPartition<String> shard1Partition = StreamPartition.of(stream, shardId1);
+  private static final StreamPartition<String> shard0Partition = StreamPartition.of(stream, shardId0);
 
   private static DataSchema dataSchema;
   private KinesisRecordSupplier supervisorRecordSupplier;
 
   private final int numThreads;
-  private KinesisSupervisor supervisor;
+  private TestableKinesisSupervisor supervisor;
   private KinesisSupervisorTuningConfig tuningConfig;
   private TaskStorage taskStorage;
   private TaskMaster taskMaster;
@@ -140,6 +140,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   private TaskQueue taskQueue;
   private RowIngestionMetersFactory rowIngestionMetersFactory;
   private ExceptionCapturingServiceEmitter serviceEmitter;
+  private SupervisorStateManagerConfig supervisorConfig;
 
   public KinesisSupervisorTest()
   {
@@ -197,6 +198,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
     serviceEmitter = new ExceptionCapturingServiceEmitter();
     EmittingLogger.registerEmitter(serviceEmitter);
+    supervisorConfig = new SupervisorStateManagerConfig();
   }
 
   @After
@@ -557,7 +559,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     );
   }
 
-  @Test(expected = ISE.class)
+  @Test
   public void testBadMetadataOffsets() throws Exception
   {
     supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@@ -590,6 +592,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     supervisor.start();
     supervisor.runInternal();
+
+    Assert.assertEquals(
+        "org.apache.druid.java.util.common.ISE",
+        supervisor.getStateManager().getExceptionEvents().get(0).getExceptionClass()
+    );
   }
 
   @Test
@@ -1377,12 +1384,14 @@ public class KinesisSupervisorTest extends EasyMockSupport
     KinesisSupervisorReportPayload payload = report.getPayload();
 
     Assert.assertEquals(DATASOURCE, payload.getDataSource());
-    Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
-    Assert.assertEquals(2, (int) payload.getPartitions());
-    Assert.assertEquals(1, (int) payload.getReplicas());
+    Assert.assertEquals(3600L, payload.getDurationSeconds());
+    Assert.assertEquals(2, payload.getPartitions());
+    Assert.assertEquals(1, payload.getReplicas());
     Assert.assertEquals(stream, payload.getStream());
     Assert.assertEquals(0, payload.getActiveTasks().size());
     Assert.assertEquals(1, payload.getPublishingTasks().size());
+    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
+    Assert.assertEquals(0, payload.getRecentErrors().size());
 
     TaskReportData publishingReport = payload.getPublishingTasks().get(0);
 
@@ -1518,12 +1527,14 @@ public class KinesisSupervisorTest extends EasyMockSupport
     KinesisSupervisorReportPayload payload = report.getPayload();
 
     Assert.assertEquals(DATASOURCE, payload.getDataSource());
-    Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
-    Assert.assertEquals(2, (int) payload.getPartitions());
-    Assert.assertEquals(1, (int) payload.getReplicas());
+    Assert.assertEquals(3600L, payload.getDurationSeconds());
+    Assert.assertEquals(2, payload.getPartitions());
+    Assert.assertEquals(1, payload.getReplicas());
     Assert.assertEquals(stream, payload.getStream());
     Assert.assertEquals(0, payload.getActiveTasks().size());
     Assert.assertEquals(1, payload.getPublishingTasks().size());
+    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
+    Assert.assertEquals(0, payload.getRecentErrors().size());
 
     TaskReportData publishingReport = payload.getPublishingTasks().get(0);
 
@@ -1706,12 +1717,14 @@ public class KinesisSupervisorTest extends EasyMockSupport
     KinesisSupervisorReportPayload payload = report.getPayload();
 
     Assert.assertEquals(DATASOURCE, payload.getDataSource());
-    Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
-    Assert.assertEquals(2, (int) payload.getPartitions());
-    Assert.assertEquals(1, (int) payload.getReplicas());
+    Assert.assertEquals(3600L, payload.getDurationSeconds());
+    Assert.assertEquals(2, payload.getPartitions());
+    Assert.assertEquals(1, payload.getReplicas());
     Assert.assertEquals(stream, payload.getStream());
     Assert.assertEquals(1, payload.getActiveTasks().size());
     Assert.assertEquals(1, payload.getPublishingTasks().size());
+    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
+    Assert.assertEquals(0, payload.getRecentErrors().size());
 
     TaskReportData activeReport = payload.getActiveTasks().get(0);
     TaskReportData publishingReport = payload.getPublishingTasks().get(0);
@@ -2247,7 +2260,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testResetNoTasks() throws Exception
+  public void testResetNoTasks()
   {
     expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes();
 
@@ -2352,7 +2365,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testResetNoDataSourceMetadata() throws Exception
+  public void testResetNoDataSourceMetadata()
   {
     expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes();
     supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@@ -2688,8 +2701,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
 
   @Test(timeout = 60_000L)
-  public void testCheckpointForInactiveTaskGroup()
-      throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
+  public void testCheckpointForInactiveTaskGroup() throws InterruptedException
   {
     supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
     //not adding any events
@@ -2987,10 +2999,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test(timeout = 60_000L)
-  public void testCheckpointWithNullTaskGroupId()
-      throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
+  public void testCheckpointWithNullTaskGroupId() throws InterruptedException
   {
     supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null, false);
+    supervisor.getStateManager().markRunFinished();
+
     //not adding any events
     final Task id1 = createKinesisIndexTask(
         "id1",
@@ -3294,7 +3307,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testResetSuspended() throws Exception
+  public void testResetSuspended()
   {
     expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes();
     EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -3363,8 +3376,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testDoNotKillCompatibleTasks()
-      throws InterruptedException, EntryExistsException, ExecutionException, TimeoutException, JsonProcessingException
+  public void testDoNotKillCompatibleTasks() throws InterruptedException, EntryExistsException
   {
     // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks
     int numReplicas = 2;
@@ -3461,8 +3473,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testKillIncompatibleTasks()
-      throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException, EntryExistsException
+  public void testKillIncompatibleTasks() throws InterruptedException, EntryExistsException
   {
     // This supervisor always returns false for isTaskCurrent -> it should kill its tasks
     int numReplicas = 2;
@@ -3699,7 +3710,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     verifyAll();
   }
 
-  private KinesisSupervisor getTestableSupervisor(
+  private TestableKinesisSupervisor getTestableSupervisor(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -3770,13 +3781,14 @@ public class KinesisSupervisorTest extends EasyMockSupport
             new NoopServiceEmitter(),
             new DruidMonitorSchedulerConfig(),
             rowIngestionMetersFactory,
-            null
+            null,
+            new SupervisorStateManagerConfig()
         ),
         rowIngestionMetersFactory
     );
   }
 
-  private KinesisSupervisor getTestableSupervisor(
+  private TestableKinesisSupervisor getTestableSupervisor(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -3798,7 +3810,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     );
   }
 
-  private KinesisSupervisor getTestableSupervisor(
+  private TestableKinesisSupervisor getTestableSupervisor(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -3871,7 +3883,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
             new NoopServiceEmitter(),
             new DruidMonitorSchedulerConfig(),
             rowIngestionMetersFactory,
-            null
+            null,
+            supervisorConfig
         ),
         rowIngestionMetersFactory
     );
@@ -3880,7 +3893,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   /**
    * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent()
    */
-  private KinesisSupervisor getTestableSupervisorCustomIsTaskCurrent(
+  private TestableKinesisSupervisor getTestableSupervisorCustomIsTaskCurrent(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -3954,7 +3967,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
             new NoopServiceEmitter(),
             new DruidMonitorSchedulerConfig(),
             rowIngestionMetersFactory,
-            null
+            null,
+            supervisorConfig
         ),
         rowIngestionMetersFactory,
         isTaskCurrentReturn
@@ -4039,7 +4053,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
             new NoopServiceEmitter(),
             new DruidMonitorSchedulerConfig(),
             rowIngestionMetersFactory,
-            null
+            null,
+            supervisorConfig
         ),
         rowIngestionMetersFactory,
         null
@@ -4243,6 +4258,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
     {
       return supervisorRecordSupplier;
     }
+
+    private SeekableStreamSupervisorStateManager getStateManager()
+    {
+      return stateManager;
+    }
   }
 
   private class TestableKinesisSupervisorWithCustomIsTaskCurrent extends TestableKinesisSupervisor
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 20cee24..b7e5123 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -223,11 +223,11 @@ public class IndexTask extends AbstractTask implements ChatHandler
     this.authorizerMapper = authorizerMapper;
     this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
     if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() > 0) {
-      determinePartitionsSavedParseExceptions = new CircularBuffer<Throwable>(
+      determinePartitionsSavedParseExceptions = new CircularBuffer<>(
           ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
       );
 
-      buildSegmentsSavedParseExceptions = new CircularBuffer<Throwable>(
+      buildSegmentsSavedParseExceptions = new CircularBuffer<>(
           ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
       );
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 327dd4d..b820b23 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -339,7 +339,7 @@ public class TaskQueue
   public boolean add(final Task task) throws EntryExistsException
   {
     if (taskStorage.getTask(task.getId()).isPresent()) {
-      throw new EntryExistsException(StringUtils.format("Task %s is already exists", task.getId()));
+      throw new EntryExistsException(StringUtils.format("Task %s already exists", task.getId()));
     }
 
     giant.lock();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 3168b89..56112d1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -183,6 +183,12 @@ public class SupervisorManager
     return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getStats());
   }
 
+  public Optional<Boolean> isSupervisorHealthy(String id)
+  {
+    Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
+    return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.isHealthy());
+  }
+
   public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourceMetadata)
   {
     Preconditions.checkState(started, "SupervisorManager not started");
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 97e0580..9d97a80 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -187,6 +187,31 @@ public class SupervisorResource
   }
 
   @GET
+  @Path("/{id}/health")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response specGetHealth(@PathParam("id") final String id)
+  {
+    return asLeaderWithSupervisorManager(
+        manager -> {
+          Optional<Boolean> healthy = manager.isSupervisorHealthy(id);
+          if (!healthy.isPresent()) {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of(
+                               "error",
+                               StringUtils.format("[%s] does not exist or health check not implemented", id)
+                           ))
+                           .build();
+          }
+
+          return Response.status(healthy.get() ? Response.Status.OK : Response.Status.SERVICE_UNAVAILABLE)
+                         .entity(ImmutableMap.of("healthy", healthy.get()))
+                         .build();
+        }
+    );
+  }
+
+  @GET
   @Path("/{id}/stats")
   @Produces(MediaType.APPLICATION_JSON)
   @ResourceFilters(SupervisorResourceFilter.class)
@@ -311,7 +336,8 @@ public class SupervisorResource
   @Produces(MediaType.APPLICATION_JSON)
   public Response specGetHistory(
       @Context final HttpServletRequest req,
-      @PathParam("id") final String id)
+      @PathParam("id") final String id
+  )
   {
     return asLeaderWithSupervisorManager(
         manager -> {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/StreamException.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/StreamException.java
new file mode 100644
index 0000000..4d2d7ce
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/StreamException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.common;
+
+public class StreamException extends RuntimeException
+{
+  public StreamException(Throwable t)
+  {
+    super(t);
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 755f630..5e7c693 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -38,6 +38,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IndexTaskClient;
 import org.apache.druid.indexing.common.TaskInfoProvider;
@@ -53,6 +54,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
@@ -63,6 +65,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
 import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
@@ -238,7 +241,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    */
   private interface Notice
   {
-    void handle() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException;
+    void handle() throws ExecutionException, InterruptedException, TimeoutException;
   }
 
   private static class StatsFromTaskResult
@@ -278,7 +281,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private class RunNotice implements Notice
   {
     @Override
-    public void handle() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException
+    public void handle()
     {
       long nowTime = System.currentTimeMillis();
       if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {
@@ -447,7 +450,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
-
   // Map<{group RandomIdUtils}, {actively reading task group}>; see documentation for TaskGroup class
   private final ConcurrentHashMap<Integer, TaskGroup> activelyReadingTaskGroups = new ConcurrentHashMap<>();
 
@@ -471,6 +473,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   protected final ObjectMapper sortingMapper;
   protected final List<PartitionIdType> partitionIds = new CopyOnWriteArrayList<>();
+  protected final SeekableStreamSupervisorStateManager stateManager;
   protected volatile DateTime sequenceLastUpdated;
 
 
@@ -507,7 +510,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private volatile boolean stopped = false;
   private volatile boolean lifecycleStarted = false;
 
-
   public SeekableStreamSupervisor(
       final String supervisorId,
       final TaskStorage taskStorage,
@@ -527,7 +529,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     this.spec = spec;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
     this.useExclusiveStartingSequence = useExclusiveStartingSequence;
-
     this.dataSource = spec.getDataSchema().getDataSource();
     this.ioConfig = spec.getIoConfig();
     this.tuningConfig = spec.getTuningConfig();
@@ -536,6 +537,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     this.exec = Execs.singleThreaded(supervisorId);
     this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + "-Scheduler-%d");
     this.reportingExec = Execs.scheduledSingleThreaded(supervisorId + "-Reporting-%d");
+    this.stateManager = new SeekableStreamSupervisorStateManager(spec.getSupervisorStateManagerConfig(), spec.isSuspended());
 
     int workerThreads = (this.tuningConfig.getWorkerThreads() != null
                          ? this.tuningConfig.getWorkerThreads()
@@ -655,6 +657,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       Preconditions.checkState(lifecycleStarted, "lifecycle not started");
 
       log.info("Beginning shutdown of [%s]", supervisorId);
+      stateManager.maybeSetState(SupervisorStateManager.BasicState.STOPPING);
 
       try {
         scheduledExec.shutdownNow(); // stop recurring executions
@@ -702,6 +705,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         log.info("[%s] has stopped", supervisorId);
       }
       catch (Exception e) {
+        stateManager.recordThrowableEvent(e);
         log.makeAlert(e, "Exception stopping [%s]", supervisorId)
            .emit();
       }
@@ -747,6 +751,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                     notice.handle();
                   }
                   catch (Throwable e) {
+                    stateManager.recordThrowableEvent(e);
                     log.makeAlert(e, "SeekableStreamSupervisor[%s] failed to handle notice", dataSource)
                        .addData("noticeClass", notice.getClass().getSimpleName())
                        .emit();
@@ -754,6 +759,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                 }
               }
               catch (InterruptedException e) {
+                stateManager.recordThrowableEvent(e);
                 log.info("SeekableStreamSupervisor[%s] interrupted, exiting", dataSource);
               }
             }
@@ -777,6 +783,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         );
       }
       catch (Exception e) {
+        stateManager.recordThrowableEvent(e);
         if (recordSupplier != null) {
           recordSupplier.close();
         }
@@ -884,7 +891,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return report;
   }
 
-
   @Override
   public Map<String, Map<String, Object>> getStats()
   {
@@ -1020,29 +1026,50 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   @VisibleForTesting
   public void runInternal()
-      throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException
   {
-    possiblyRegisterListener();
-    updatePartitionDataFromStream();
-    discoverTasks();
-    updateTaskStatus();
-    checkTaskDuration();
-    checkPendingCompletionTasks();
-    checkCurrentTaskState();
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
-    if (!spec.isSuspended()) {
-      log.info("[%s] supervisor is running.", dataSource);
-      createNewTasks();
-    } else {
-      log.info("[%s] supervisor is suspended.", dataSource);
-      gracefulShutdownInternal();
-    }
+    try {
+      possiblyRegisterListener();
 
-    if (log.isDebugEnabled()) {
-      log.debug(generateReport(true).toString());
-    } else {
-      log.info(generateReport(false).toString());
+      stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
+      if (!updatePartitionDataFromStream() && !stateManager.isAtLeastOneSuccessfulRun()) {
+        return; // if we can't connect to the stream and this is the first run, stop and wait to retry the connection
+      }
+
+      stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
+      discoverTasks();
+
+      updateTaskStatus();
+
+      checkTaskDuration();
+
+      checkPendingCompletionTasks();
+
+      checkCurrentTaskState();
+
+      // if supervisor is not suspended, ensure required tasks are running
+      // if suspended, ensure tasks have been requested to gracefully stop
+      if (!spec.isSuspended()) {
+        log.info("[%s] supervisor is running.", dataSource);
+
+        stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);
+        createNewTasks();
+      } else {
+        log.info("[%s] supervisor is suspended.", dataSource);
+        gracefulShutdownInternal();
+      }
+
+      if (log.isDebugEnabled()) {
+        log.debug(generateReport(true).toString());
+      } else {
+        log.info(generateReport(false).toString());
+      }
+    }
+    catch (Exception e) {
+      stateManager.recordThrowableEvent(e);
+      log.warn(e, "Exception in supervisor run loop for dataSource [%s]", dataSource);
+    }
+    finally {
+      stateManager.markRunFinished();
     }
   }
 
@@ -1200,8 +1227,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         );
       }
     }
-
-
   }
 
   private void killTask(final String id, String reasonFormat, Object... args)
@@ -1348,6 +1373,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                     .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
                               }
                               catch (InterruptedException | ExecutionException | TimeoutException e) {
+                                stateManager.recordThrowableEvent(e);
                                 log.warn(e, "Exception while stopping task");
                               }
                               return false;
@@ -1365,6 +1391,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                   .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
                             }
                             catch (InterruptedException | ExecutionException | TimeoutException e) {
+                              stateManager.recordThrowableEvent(e);
                               log.warn(e, "Exception while stopping task");
                             }
                             return false;
@@ -1406,6 +1433,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                         return true;
                       }
                       catch (Throwable t) {
+                        stateManager.recordThrowableEvent(t);
                         log.error(t, "Something bad while discovering task [%s]", taskId);
                         return null;
                       }
@@ -1429,7 +1457,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     // make sure the checkpoints are consistent with each other and with the metadata store
 
     verifyAndMergeCheckpoints(taskGroupsToVerify.values());
-
   }
 
   private void verifyAndMergeCheckpoints(final Collection<TaskGroup> taskGroupsToVerify)
@@ -1482,6 +1509,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             futures.get(i).get();
           }
           catch (Exception e) {
+            stateManager.recordThrowableEvent(e);
             log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId);
             killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass());
             taskGroup.tasks.remove(taskId);
@@ -1783,7 +1811,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   protected abstract String baseTaskName();
 
-  private void updatePartitionDataFromStream()
+  private boolean updatePartitionDataFromStream()
   {
     Set<PartitionIdType> partitionIds;
     try {
@@ -1792,14 +1820,17 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       }
     }
     catch (Exception e) {
-      log.warn("Could not fetch partitions for topic/stream [%s]", ioConfig.getStream());
+      stateManager.recordThrowableEvent(e);
+      log.warn("Could not fetch partitions for topic/stream [%s]: %s", ioConfig.getStream(), e.getMessage());
       log.debug(e, "full stack trace");
-      return;
+      return false;
     }
 
     if (partitionIds == null || partitionIds.size() == 0) {
-      log.warn("No partitions found for stream[%s]", ioConfig.getStream());
-      return;
+      String errMsg = StringUtils.format("No partitions found for stream [%s]", ioConfig.getStream());
+      stateManager.recordThrowableEvent(new StreamException(new ISE(errMsg)));
+      log.warn(errMsg);
+      return false;
     }
 
     log.debug("Found [%d] partitions for stream [%s]", partitionIds.size(), ioConfig.getStream());
@@ -1839,6 +1870,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         );
       }
     }
+
+    return true;
   }
 
   private void updateTaskStatus() throws ExecutionException, InterruptedException, TimeoutException
@@ -1986,6 +2019,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             // This will cause us to create a new set of tasks next cycle that will start from the sequences in
             // metadata store (which will have advanced if we succeeded in publishing and will remain the same if
             // publishing failed and we need to re-ingest)
+            stateManager.recordCompletedTaskState(TaskState.SUCCESS);
             return Futures.transform(
                 stopTasksInGroup(taskGroup, "task[%s] succeeded in the taskGroup", task.status.getId()),
                 new Function<Object, Map<PartitionIdType, SequenceOffsetType>>()
@@ -2047,6 +2081,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                   throw new RuntimeException(e);
                 }
                 catch (ExecutionException e) {
+                  stateManager.recordThrowableEvent(e);
                   pauseException = e.getCause() == null ? e : e.getCause();
                 }
 
@@ -2196,6 +2231,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);
 
           if (taskData.status.isFailure()) {
+            stateManager.recordCompletedTaskState(TaskState.FAILED);
             iTask.remove(); // remove failed task
             if (group.tasks.isEmpty()) {
               // if all tasks in the group have failed, just nuke all task groups with this partition set and restart
@@ -2208,6 +2244,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             // If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
             // we no longer need them to publish their segment.
             log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
+            stateManager.recordCompletedTaskState(TaskState.SUCCESS);
             futures.add(
                 stopTasksInGroup(group, "Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds())
             );
@@ -2292,6 +2329,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
         // remove failed tasks
         if (taskData.status.isFailure()) {
+          stateManager.recordCompletedTaskState(TaskState.FAILED);
           iTasks.remove();
           continue;
         }
@@ -2299,6 +2337,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         // check for successful tasks, and if we find one, stop all tasks in the group and remove the group so it can
         // be recreated with the next set of sequences
         if (taskData.status.isSuccess()) {
+          stateManager.recordCompletedTaskState(TaskState.SUCCESS);
           futures.add(stopTasksInGroup(taskGroup, "task[%s] succeeded in the same taskGroup", taskData.status.getId()));
           iTaskGroups.remove();
           break;
@@ -2465,18 +2504,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             resetInternal(
                 createDataSourceMetaDataForReset(ioConfig.getStream(), ImmutableMap.of(partition, sequence))
             );
-            throw new ISE(
+            throw new StreamException(new ISE(
                 "Previous sequenceNumber [%s] is no longer available for partition [%s] - automatically resetting sequence",
                 sequence,
                 partition
-            );
+            ));
 
           } else {
-            throw new ISE(
+            throw new StreamException(new ISE(
                 "Previous sequenceNumber [%s] is no longer available for partition [%s]. You can clear the previous sequenceNumber and start reading from a valid message by using the supervisor's reset API.",
                 sequence,
                 partition
-            );
+            ));
           }
         }
       }
@@ -2591,6 +2630,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           taskQueue.get().add(indexTask);
         }
         catch (EntryExistsException e) {
+          stateManager.recordThrowableEvent(e);
           log.error("Tried to add task [%s] but it already exists", indexTask.getId());
         }
       } else {
@@ -2648,7 +2688,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       }
       catch (Exception e) {
         log.warn("Could not fetch partitions for topic/stream [%s]", ioConfig.getStream());
-        throw new RuntimeException(e);
+        throw new StreamException(e);
       }
 
       Set<StreamPartition<PartitionIdType>> partitions = partitionIds
@@ -2661,7 +2701,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
       updateLatestSequenceFromStream(recordSupplier, partitions);
     }
-
   }
 
   protected abstract void updateLatestSequenceFromStream(
@@ -2688,7 +2727,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return makeSequenceNumber(seq, false);
   }
 
-
   // exposed for testing for visibility into initialization state
   @VisibleForTesting
   public boolean isStarted()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java
index 29547c3..7436488 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.seekablestream.supervisor;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.java.util.common.IAE;
 import org.joda.time.DateTime;
 
@@ -44,6 +45,10 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
   private final Long aggregateLag;
   private final DateTime offsetsLastUpdated;
   private final boolean suspended;
+  private final boolean healthy;
+  private final SupervisorStateManager.State state;
+  private final SupervisorStateManager.State detailedState;
+  private final List<SupervisorStateManager.ExceptionEvent> recentErrors;
 
   public SeekableStreamSupervisorReportPayload(
       String dataSource,
@@ -55,7 +60,11 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
       @Nullable Map<PartitionIdType, SequenceOffsetType> minimumLag,
       @Nullable Long aggregateLag,
       @Nullable DateTime offsetsLastUpdated,
-      boolean suspended
+      boolean suspended,
+      boolean healthy,
+      SupervisorStateManager.State state,
+      SupervisorStateManager.State detailedState,
+      List<SupervisorStateManager.ExceptionEvent> recentErrors
   )
   {
     this.dataSource = dataSource;
@@ -70,6 +79,10 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
     this.aggregateLag = aggregateLag;
     this.offsetsLastUpdated = offsetsLastUpdated;
     this.suspended = suspended;
+    this.healthy = healthy;
+    this.state = state;
+    this.detailedState = detailedState;
+    this.recentErrors = recentErrors;
   }
 
   public void addTask(TaskReportData data)
@@ -108,12 +121,18 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
   }
 
   @JsonProperty
-  public boolean getSuspended()
+  public boolean isSuspended()
   {
     return suspended;
   }
 
   @JsonProperty
+  public boolean isHealthy()
+  {
+    return healthy;
+  }
+
+  @JsonProperty
   public long getDurationSeconds()
   {
     return durationSeconds;
@@ -154,4 +173,22 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
   {
     return offsetsLastUpdated;
   }
+
+  @JsonProperty
+  public SupervisorStateManager.State getState()
+  {
+    return state;
+  }
+
+  @JsonProperty
+  public SupervisorStateManager.State getDetailedState()
+  {
+    return detailedState;
+  }
+
+  @JsonProperty
+  public List<SupervisorStateManager.ExceptionEvent> getRecentErrors()
+  {
+    return recentErrors;
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 2eb8858..469821a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -57,6 +58,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
   protected final ServiceEmitter emitter;
   protected final DruidMonitorSchedulerConfig monitorSchedulerConfig;
   private final boolean suspended;
+  protected final SupervisorStateManagerConfig supervisorStateManagerConfig;
 
   @JsonCreator
   public SeekableStreamSupervisorSpec(
@@ -72,7 +74,8 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
       @JacksonInject @Json ObjectMapper mapper,
       @JacksonInject ServiceEmitter emitter,
       @JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig,
-      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+      @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
   )
   {
     this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
@@ -89,6 +92,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
     this.monitorSchedulerConfig = monitorSchedulerConfig;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
     this.suspended = suspended != null ? suspended : false;
+    this.supervisorStateManagerConfig = supervisorStateManagerConfig;
   }
 
   @JsonProperty
@@ -153,6 +157,11 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
     return toggleSuspend(false);
   }
 
+  public SupervisorStateManagerConfig getSupervisorStateManagerConfig()
+  {
+    return supervisorStateManagerConfig;
+  }
+
   @Override
   @JsonProperty("suspended")
   public boolean isSuspended()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManager.java
new file mode 100644
index 0000000..5b41fc2
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
+
+public class SeekableStreamSupervisorStateManager extends SupervisorStateManager
+{
+  public enum SeekableStreamState implements State
+  {
+    UNABLE_TO_CONNECT_TO_STREAM(false, true),
+    LOST_CONTACT_WITH_STREAM(false, false),
+
+    CONNECTING_TO_STREAM(true, true),
+    DISCOVERING_INITIAL_TASKS(true, true),
+    CREATING_TASKS(true, true);
+
+    private final boolean healthy;
+    private final boolean firstRunOnly;
+
+    SeekableStreamState(boolean healthy, boolean firstRunOnly)
+    {
+      this.healthy = healthy;
+      this.firstRunOnly = firstRunOnly;
+    }
+
+    @Override
+    public boolean isHealthy()
+    {
+      return healthy;
+    }
+
+    @Override
+    public boolean isFirstRunOnly()
+    {
+      return firstRunOnly;
+    }
+
+    @Override
+    public State getBasicState()
+    {
+      return healthy ? BasicState.RUNNING : BasicState.UNHEALTHY_SUPERVISOR;
+    }
+  }
+
+  public SeekableStreamSupervisorStateManager(SupervisorStateManagerConfig supervisorConfig, boolean suspended)
+  {
+    super(supervisorConfig, suspended);
+  }
+
+  @Override
+  protected State getSpecificUnhealthySupervisorState()
+  {
+    ExceptionEvent event = getRecentEventsQueue().getLast();
+    if (event instanceof SeekableStreamExceptionEvent && ((SeekableStreamExceptionEvent) event).isStreamException()) {
+      return isAtLeastOneSuccessfulRun()
+             ? SeekableStreamState.LOST_CONTACT_WITH_STREAM
+             : SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM;
+    }
+
+    return BasicState.UNHEALTHY_SUPERVISOR;
+  }
+
+  @Override
+  protected ExceptionEvent buildExceptionEvent(Throwable t)
+  {
+    return new SeekableStreamExceptionEvent(t, isStoreStackTrace());
+  }
+
+  public static class SeekableStreamExceptionEvent extends ExceptionEvent
+  {
+    private final boolean streamException;
+
+    public SeekableStreamExceptionEvent(Throwable t, boolean storeStackTrace)
+    {
+      super(t, storeStackTrace);
+
+      this.streamException = ExceptionUtils.indexOfType(t, StreamException.class) != -1;
+    }
+
+    @JsonProperty
+    public boolean isStreamException()
+    {
+      return streamException;
+    }
+
+    @Override
+    protected boolean shouldSkipException(String className)
+    {
+      return RuntimeException.class.getName().equals(className) || StreamException.class.getName().equals(className);
+    }
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 3b31dec..96afde5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -69,7 +69,8 @@ public class SupervisorResourceTest extends EasyMockSupport
   {
     supervisorResource = new SupervisorResource(
         taskMaster,
-        new AuthorizerMapper(null) {
+        new AuthorizerMapper(null)
+        {
           @Override
           public Authorizer getAuthorizer(String name)
           {
@@ -92,7 +93,8 @@ public class SupervisorResourceTest extends EasyMockSupport
   @Test
   public void testSpecPost()
   {
-    SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null) {
+    SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null)
+    {
 
       @Override
       public List<String> getDataSources()
@@ -132,7 +134,8 @@ public class SupervisorResourceTest extends EasyMockSupport
   public void testSpecGetAll()
   {
     Set<String> supervisorIds = ImmutableSet.of("id1", "id2");
-    SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null) {
+    SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null)
+    {
 
       @Override
       public List<String> getDataSources()
@@ -140,7 +143,8 @@ public class SupervisorResourceTest extends EasyMockSupport
         return Collections.singletonList("datasource1");
       }
     };
-    SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null) {
+    SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null)
+    {
 
       @Override
       public List<String> getDataSources()
@@ -183,7 +187,8 @@ public class SupervisorResourceTest extends EasyMockSupport
   {
     Set<String> supervisorIds = ImmutableSet.of("id1", "id2");
 
-    SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null) {
+    SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null)
+    {
 
       @Override
       public List<String> getDataSources()
@@ -191,7 +196,8 @@ public class SupervisorResourceTest extends EasyMockSupport
         return Collections.singletonList("datasource1");
       }
     };
-    SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null) {
+    SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null)
+    {
 
       @Override
       public List<String> getDataSources()
@@ -290,9 +296,40 @@ public class SupervisorResourceTest extends EasyMockSupport
   }
 
   @Test
+  public void testSpecGetHealth()
+  {
+    EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(3);
+    EasyMock.expect(supervisorManager.isSupervisorHealthy("my-id")).andReturn(Optional.of(true));
+    EasyMock.expect(supervisorManager.isSupervisorHealthy("my-id-2")).andReturn(Optional.of(false));
+    EasyMock.expect(supervisorManager.isSupervisorHealthy("my-id-3")).andReturn(Optional.absent());
+    replayAll();
+
+    Response response = supervisorResource.specGetHealth("my-id");
+
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("healthy", true), response.getEntity());
+
+    response = supervisorResource.specGetHealth("my-id-2");
+
+    Assert.assertEquals(503, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("healthy", false), response.getEntity());
+
+    response = supervisorResource.specGetHealth("my-id-3");
+
+    Assert.assertEquals(404, response.getStatus());
+    Assert.assertEquals(
+        ImmutableMap.of("error", "[my-id-3] does not exist or health check not implemented"),
+        response.getEntity()
+    );
+
+    verifyAll();
+  }
+
+  @Test
   public void testSpecSuspend()
   {
-    TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) {
+    TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true)
+    {
       @Override
       public List<String> getDataSources()
       {
@@ -329,7 +366,8 @@ public class SupervisorResourceTest extends EasyMockSupport
   @Test
   public void testSpecResume()
   {
-    TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) {
+    TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false)
+    {
       @Override
       public List<String> getDataSources()
       {
@@ -872,8 +910,14 @@ public class SupervisorResourceTest extends EasyMockSupport
     Capture<String> id1 = Capture.newInstance();
     Capture<String> id2 = Capture.newInstance();
     EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
-    EasyMock.expect(supervisorManager.resetSupervisor(EasyMock.capture(id1), EasyMock.anyObject(DataSourceMetadata.class))).andReturn(true);
-    EasyMock.expect(supervisorManager.resetSupervisor(EasyMock.capture(id2), EasyMock.anyObject(DataSourceMetadata.class))).andReturn(false);
+    EasyMock.expect(supervisorManager.resetSupervisor(
+        EasyMock.capture(id1),
+        EasyMock.anyObject(DataSourceMetadata.class)
+    )).andReturn(true);
+    EasyMock.expect(supervisorManager.resetSupervisor(
+        EasyMock.capture(id2),
+        EasyMock.anyObject(DataSourceMetadata.class)
+    )).andReturn(false);
     replayAll();
 
     Response response = supervisorResource.reset("my-id");
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
new file mode 100644
index 0000000..f65fccf
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Pair;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSupervisorStateManagerTest
+{
+  private SeekableStreamSupervisorStateManager stateManager;
+  private SupervisorStateManagerConfig config;
+  private ObjectMapper defaultMapper;
+
+  @Before
+  public void setupTest()
+  {
+    config = new SupervisorStateManagerConfig(10);
+    stateManager = new SeekableStreamSupervisorStateManager(config, false);
+    defaultMapper = new DefaultObjectMapper();
+  }
+
+  @Test
+  public void testHappyPath()
+  {
+    Assert.assertEquals(BasicState.PENDING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
+    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamState.DISCOVERING_INITIAL_TASKS);
+    Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamState.CREATING_TASKS);
+    Assert.assertEquals(SeekableStreamState.CREATING_TASKS, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.markRunFinished();
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+
+    stateManager.maybeSetState(BasicState.PENDING);
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamState.CONNECTING_TO_STREAM);
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamState.DISCOVERING_INITIAL_TASKS);
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamState.CREATING_TASKS);
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.markRunFinished();
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+  }
+
+  @Test
+  public void testStreamFailureLostContact()
+  {
+    stateManager.markRunFinished(); // clean run without errors
+
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+
+    for (int i = 0; i < config.getUnhealthinessThreshold(); i++) {
+      Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+      stateManager.recordThrowableEvent(new StreamException(new IllegalStateException("DOH!")));
+      stateManager.markRunFinished();
+    }
+    Assert.assertEquals(SeekableStreamState.LOST_CONTACT_WITH_STREAM, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(config.getUnhealthinessThreshold(), stateManager.getExceptionEvents().size());
+
+    stateManager.getExceptionEvents().forEach(x -> {
+      Assert.assertTrue(((SeekableStreamExceptionEvent) x).isStreamException());
+      Assert.assertEquals(IllegalStateException.class.getName(), x.getExceptionClass());
+    });
+  }
+
+  @Test
+  public void testStreamFailureUnableToConnect()
+  {
+    stateManager.maybeSetState(SeekableStreamState.CONNECTING_TO_STREAM);
+    for (int i = 0; i < config.getUnhealthinessThreshold(); i++) {
+      Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, stateManager.getSupervisorState());
+      stateManager.recordThrowableEvent(new StreamException(new IllegalStateException("DOH!")));
+      stateManager.markRunFinished();
+    }
+    Assert.assertEquals(SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(config.getUnhealthinessThreshold(), stateManager.getExceptionEvents().size());
+
+    stateManager.getExceptionEvents().forEach(x -> {
+      Assert.assertTrue(((SeekableStreamExceptionEvent) x).isStreamException());
+      Assert.assertEquals(IllegalStateException.class.getName(), x.getExceptionClass());
+    });
+  }
+
+  @Test
+  public void testNonStreamUnhealthiness()
+  {
+    stateManager.maybeSetState(SeekableStreamState.DISCOVERING_INITIAL_TASKS);
+    for (int i = 0; i < config.getUnhealthinessThreshold(); i++) {
+      Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS, stateManager.getSupervisorState());
+      stateManager.recordThrowableEvent(new NullPointerException("oof"));
+      stateManager.markRunFinished();
+    }
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(config.getUnhealthinessThreshold(), stateManager.getExceptionEvents().size());
+
+    stateManager.getExceptionEvents().forEach(x -> {
+      Assert.assertFalse(((SeekableStreamExceptionEvent) x).isStreamException());
+      Assert.assertEquals(NullPointerException.class.getName(), x.getExceptionClass());
+    });
+  }
+
+  @Test
+  public void testTransientUnhealthiness()
+  {
+    stateManager.markRunFinished();
+    for (int j = 1; j < 3; j++) {
+      for (int i = 0; i < config.getUnhealthinessThreshold() - 1; i++) {
+        stateManager.recordThrowableEvent(new NullPointerException("oof"));
+        stateManager.markRunFinished();
+        Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+      }
+
+      stateManager.markRunFinished(); // clean run
+      Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+      Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+      Assert.assertEquals(j * (config.getUnhealthinessThreshold() - 1), stateManager.getExceptionEvents().size());
+    }
+  }
+
+  @Test
+  public void testNonTransientTaskUnhealthiness()
+  {
+    stateManager.markRunFinished();
+    for (int i = 0; i < config.getTaskUnhealthinessThreshold(); i++) {
+      Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+      stateManager.recordCompletedTaskState(TaskState.FAILED);
+      stateManager.markRunFinished();
+    }
+    Assert.assertEquals(BasicState.UNHEALTHY_TASKS, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_TASKS, stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(0, stateManager.getExceptionEvents().size());
+  }
+
+  @Test
+  public void testTransientTaskUnhealthiness()
+  {
+    // Only half are failing
+    stateManager.markRunFinished();
+    for (int i = 0; i < config.getTaskUnhealthinessThreshold() + 3; i++) {
+      Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+      stateManager.recordCompletedTaskState(TaskState.FAILED);
+      stateManager.recordCompletedTaskState(TaskState.SUCCESS);
+      stateManager.markRunFinished();
+    }
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(0, stateManager.getExceptionEvents().size());
+  }
+
+  @Test
+  public void testSupervisorRecoveryWithHealthinessThreshold()
+  {
+    // Put into an unhealthy state
+    for (int i = 0; i < config.getUnhealthinessThreshold(); i++) {
+      Assert.assertEquals(BasicState.PENDING, stateManager.getSupervisorState());
+      stateManager.recordThrowableEvent(new Exception("Except the inevitable"));
+      stateManager.markRunFinished();
+    }
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, stateManager.getSupervisorState());
+
+    // Recover after config.healthinessThreshold successful task completions
+    for (int i = 0; i < config.getHealthinessThreshold(); i++) {
+      Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, stateManager.getSupervisorState());
+      stateManager.markRunFinished();
+    }
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(config.getUnhealthinessThreshold(), stateManager.getExceptionEvents().size());
+
+    stateManager.getExceptionEvents().forEach(x -> {
+      Assert.assertFalse(((SeekableStreamExceptionEvent) x).isStreamException());
+      Assert.assertEquals(Exception.class.getName(), x.getExceptionClass());
+    });
+  }
+
+  @Test
+  public void testTaskRecoveryWithHealthinessThreshold()
+  {
+    stateManager.markRunFinished();
+
+    // Put into an unhealthy state
+    for (int i = 0; i < config.getTaskUnhealthinessThreshold(); i++) {
+      Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+      stateManager.recordCompletedTaskState(TaskState.FAILED);
+      stateManager.markRunFinished();
+    }
+    Assert.assertEquals(BasicState.UNHEALTHY_TASKS, stateManager.getSupervisorState());
+
+    // Recover after config.healthinessThreshold successful task completions
+    for (int i = 0; i < config.getTaskHealthinessThreshold(); i++) {
+      Assert.assertEquals(BasicState.UNHEALTHY_TASKS, stateManager.getSupervisorState());
+      stateManager.recordCompletedTaskState(TaskState.SUCCESS);
+      stateManager.markRunFinished();
+    }
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+  }
+
+  @Test
+  public void testTwoUnhealthyStates()
+  {
+    stateManager.markRunFinished();
+
+    for (int i = 0; i < Math.max(config.getTaskUnhealthinessThreshold(), config.getUnhealthinessThreshold()); i++) {
+      stateManager.recordThrowableEvent(new NullPointerException("somebody goofed"));
+      stateManager.recordCompletedTaskState(TaskState.FAILED);
+      stateManager.markRunFinished();
+    }
+    // UNHEALTHY_SUPERVISOR should take priority over UNHEALTHY_TASKS
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, stateManager.getSupervisorState());
+  }
+
+  @Test
+  public void testGetThrowableEvents()
+  {
+    List<Exception> exceptions = ImmutableList.of(
+        new StreamException(new UnsupportedOperationException("oof")),
+        new NullPointerException("oof"),
+        new RuntimeException(new StreamException(new Exception("oof"))),
+        new RuntimeException(new IllegalArgumentException("oof"))
+    );
+    for (Exception exception : exceptions) {
+      stateManager.recordThrowableEvent(exception);
+      stateManager.markRunFinished();
+    }
+
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, stateManager.getSupervisorState());
+
+    List<Pair<String, Boolean>> expected = ImmutableList.of(
+        Pair.of("java.lang.UnsupportedOperationException", true),
+        Pair.of("java.lang.NullPointerException", false),
+        Pair.of("java.lang.Exception", true),
+        Pair.of("java.lang.IllegalArgumentException", false)
+    );
+
+    Iterator<SupervisorStateManager.ExceptionEvent> it = stateManager.getExceptionEvents().iterator();
+    expected.forEach(x -> {
+      SupervisorStateManager.ExceptionEvent event = it.next();
+      Assert.assertNotNull(event.getMessage());
+      Assert.assertEquals(x.lhs, event.getExceptionClass());
+      Assert.assertEquals(x.rhs, ((SeekableStreamExceptionEvent) event).isStreamException());
+    });
+
+    Assert.assertFalse(it.hasNext());
+  }
+
+  @Test
+  public void testExceptionEventSerde() throws IOException
+  {
+    SupervisorStateManager.ExceptionEvent event =
+        new SupervisorStateManager.ExceptionEvent(new NullPointerException("msg"), true);
+
+    String serialized = defaultMapper.writeValueAsString(event);
+
+    Map<String, String> deserialized = defaultMapper.readValue(serialized, new TypeReference<Map<String, String>>()
+    {
+    });
+    Assert.assertNotNull(deserialized.get("timestamp"));
+    Assert.assertEquals("java.lang.NullPointerException", deserialized.get("exceptionClass"));
+    Assert.assertFalse(Boolean.getBoolean(deserialized.get("streamException")));
+    Assert.assertNotNull(deserialized.get("message"));
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
new file mode 100644
index 0000000..1058eb0
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskQueue;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.easymock.EasyMock.anyInt;
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+
+public class SeekableStreamSupervisorStateTest extends EasyMockSupport
+{
+  private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+  private static final String DATASOURCE = "testDS";
+  private static final String STREAM = "stream";
+  private static final String SHARD_ID = "0";
+  private static final StreamPartition<String> shard0Partition = StreamPartition.of(STREAM, SHARD_ID);
+  private static final String EXCEPTION_MSG = "I had an exception";
+
+  private TaskStorage taskStorage;
+  private TaskMaster taskMaster;
+  private TaskRunner taskRunner;
+  private TaskQueue taskQueue;
+  private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
+  private SeekableStreamIndexTaskClientFactory taskClientFactory;
+  private SeekableStreamSupervisorSpec spec;
+  private SeekableStreamIndexTaskClient indexTaskClient;
+  private RecordSupplier<String, String> recordSupplier;
+
+  private RowIngestionMetersFactory rowIngestionMetersFactory;
+  private SupervisorStateManagerConfig supervisorConfig;
+
+  @Before
+  public void setupTest()
+  {
+    taskStorage = createMock(TaskStorage.class);
+    taskMaster = createMock(TaskMaster.class);
+    taskRunner = createMock(TaskRunner.class);
+    taskQueue = createMock(TaskQueue.class);
+    indexerMetadataStorageCoordinator = createMock(IndexerMetadataStorageCoordinator.class);
+    taskClientFactory = createMock(SeekableStreamIndexTaskClientFactory.class);
+    spec = createMock(SeekableStreamSupervisorSpec.class);
+    indexTaskClient = createMock(SeekableStreamIndexTaskClient.class);
+    recordSupplier = (RecordSupplier<String, String>) createMock(RecordSupplier.class);
+
+    rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
+
+    supervisorConfig = new SupervisorStateManagerConfig();
+
+    expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
+    expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+
+    expect(taskClientFactory.build(anyObject(), anyString(), anyInt(), anyObject(), anyLong())).andReturn(
+        indexTaskClient).anyTimes();
+    expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
+
+    expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
+    expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(shard0Partition)).anyTimes();
+    expect(recordSupplier.getLatestSequenceNumber(anyObject())).andReturn("10").anyTimes();
+  }
+
+  @Test
+  public void testRunning() throws Exception
+  {
+    expect(spec.isSuspended()).andReturn(false).anyTimes();
+    expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    expect(taskQueue.add(anyObject())).andReturn(true).anyTimes();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    supervisor.start();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testConnectingToStreamFail() throws Exception
+  {
+    expect(spec.isSuspended()).andReturn(false).anyTimes();
+    expect(recordSupplier.getPartitionIds(STREAM)).andThrow(new StreamException(new IllegalStateException(EXCEPTION_MSG)))
+                                                  .anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    expect(taskQueue.add(anyObject())).andReturn(true).anyTimes();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    supervisor.start();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    List<SupervisorStateManager.ExceptionEvent> exceptionEvents = supervisor.stateManager.getExceptionEvents();
+    Assert.assertEquals(1, exceptionEvents.size());
+    Assert.assertTrue(((SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
+    Assert.assertEquals(IllegalStateException.class.getName(), exceptionEvents.get(0).getExceptionClass());
+    Assert.assertEquals(
+        StringUtils.format("%s: %s", IllegalStateException.class.getName(), EXCEPTION_MSG),
+        exceptionEvents.get(0).getMessage()
+    );
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(2, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testConnectingToStreamFailRecoveryFailRecovery() throws Exception
+  {
+    expect(spec.isSuspended()).andReturn(false).anyTimes();
+    expect(recordSupplier.getPartitionIds(STREAM)).andThrow(new StreamException(new IllegalStateException())).times(3);
+    expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3);
+    expect(recordSupplier.getPartitionIds(STREAM)).andThrow(new StreamException(new IllegalStateException())).times(3);
+    expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3);
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    expect(taskQueue.add(anyObject())).andReturn(true).anyTimes();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    supervisor.start();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertEquals(SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
+    supervisor.runInternal();
+    Assert.assertEquals(SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
+    supervisor.runInternal();
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    supervisor.runInternal();
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    supervisor.runInternal();
+    Assert.assertEquals(SeekableStreamState.LOST_CONTACT_WITH_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.LOST_CONTACT_WITH_STREAM, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.LOST_CONTACT_WITH_STREAM, supervisor.stateManager.getSupervisorState());
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testDiscoveringInitialTasksFailRecoveryFail() throws Exception
+  {
+    expect(spec.isSuspended()).andReturn(false).anyTimes();
+    expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).times(3);
+    expect(taskStorage.getActiveTasks()).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
+    expect(taskQueue.add(anyObject())).andReturn(true).anyTimes();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+    supervisor.start();
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    List<SupervisorStateManager.ExceptionEvent> exceptionEvents = supervisor.stateManager.getExceptionEvents();
+    Assert.assertEquals(1, exceptionEvents.size());
+    Assert.assertFalse(((SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
+    Assert.assertEquals(IllegalStateException.class.getName(), exceptionEvents.get(0).getExceptionClass());
+    Assert.assertEquals(EXCEPTION_MSG, exceptionEvents.get(0).getMessage());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(2, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testCreatingTasksFailRecoveryFail() throws Exception
+  {
+    expect(spec.isSuspended()).andReturn(false).anyTimes();
+    expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    expect(taskQueue.add(anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
+    expect(taskQueue.add(anyObject())).andReturn(true).times(3);
+    expect(taskQueue.add(anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+    supervisor.start();
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    List<SupervisorStateManager.ExceptionEvent> exceptionEvents = supervisor.stateManager.getExceptionEvents();
+    Assert.assertEquals(1, exceptionEvents.size());
+    Assert.assertFalse(((SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
+    Assert.assertEquals(IllegalStateException.class.getName(), exceptionEvents.get(0).getExceptionClass());
+    Assert.assertEquals(EXCEPTION_MSG, exceptionEvents.get(0).getMessage());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(2, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+    Assert.assertFalse(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testSuspended() throws Exception
+  {
+    expect(spec.isSuspended()).andReturn(true).anyTimes();
+    expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    expect(taskQueue.add(anyObject())).andReturn(true).anyTimes();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    supervisor.start();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testStopping() throws Exception
+  {
+    expect(spec.isSuspended()).andReturn(false).anyTimes();
+    expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    expect(taskQueue.add(anyObject())).andReturn(true).anyTimes();
+
+    taskRunner.unregisterListener("testSupervisorId");
+    indexTaskClient.close();
+    recordSupplier.close();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    supervisor.start();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.stop(false);
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState().getBasicState());
+
+    verifyAll();
+  }
+
+  private static DataSchema getDataSchema()
+  {
+    List<DimensionSchema> dimensions = new ArrayList<>();
+    dimensions.add(StringDimensionSchema.create("dim1"));
+    dimensions.add(StringDimensionSchema.create("dim2"));
+
+    return new DataSchema(
+        DATASOURCE,
+        objectMapper.convertValue(
+            new StringInputRowParser(
+                new JSONParseSpec(
+                    new TimestampSpec("timestamp", "iso", null),
+                    new DimensionsSpec(
+                        dimensions,
+                        null,
+                        null
+                    ),
+                    new JSONPathSpec(true, ImmutableList.of()),
+                    ImmutableMap.of()
+                ),
+                StandardCharsets.UTF_8.name()
+            ),
+            Map.class
+        ),
+        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+        new UniformGranularitySpec(
+            Granularities.HOUR,
+            Granularities.NONE,
+            ImmutableList.of()
+        ),
+        null,
+        objectMapper
+    );
+  }
+
+  private static SeekableStreamSupervisorIOConfig getIOConfig()
+  {
+    return new SeekableStreamSupervisorIOConfig(
+        "stream",
+        1,
+        1,
+        new Period("PT1H"),
+        new Period("P1D"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null
+    )
+    {
+    };
+  }
+
+  private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+  {
+    return new SeekableStreamSupervisorTuningConfig()
+    {
+      @Override
+      public Integer getWorkerThreads()
+      {
+        return 1;
+      }
+
+      @Override
+      public Integer getChatThreads()
+      {
+        return 1;
+      }
+
+      @Override
+      public Long getChatRetries()
+      {
+        return 1L;
+      }
+
+      @Override
+      public Duration getHttpTimeout()
+      {
+        return new Period("PT1M").toStandardDuration();
+      }
+
+      @Override
+      public Duration getShutdownTimeout()
+      {
+        return new Period("PT1S").toStandardDuration();
+      }
+
+      @Override
+      public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+      {
+        return new SeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        )
+        {
+          @Override
+          public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
+          {
+            return null;
+          }
+
+          @Override
+          public String toString()
+          {
+            return null;
+          }
+        };
+      }
+    };
+  }
+
+  private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String>
+  {
+    public TestSeekableStreamIndexTask(
+        String id,
+        @Nullable TaskResource taskResource,
+        DataSchema dataSchema,
+        SeekableStreamIndexTaskTuningConfig tuningConfig,
+        SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
+        @Nullable Map<String, Object> context,
+        @Nullable ChatHandlerProvider chatHandlerProvider,
+        AuthorizerMapper authorizerMapper,
+        RowIngestionMetersFactory rowIngestionMetersFactory,
+        @Nullable String groupId
+    )
+    {
+      super(
+          id,
+          taskResource,
+          dataSchema,
+          tuningConfig,
+          ioConfig,
+          context,
+          chatHandlerProvider,
+          authorizerMapper,
+          rowIngestionMetersFactory,
+          groupId
+      );
+    }
+
+    @Override
+    protected SeekableStreamIndexTaskRunner<String, String> createTaskRunner()
+    {
+      return null;
+    }
+
+    @Override
+    protected RecordSupplier<String, String> newTaskRecordSupplier()
+    {
+      return recordSupplier;
+    }
+
+    @Override
+    public String getType()
+    {
+      return "test";
+    }
+  }
+
+  private class TestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String>
+  {
+    private TestSeekableStreamSupervisor()
+    {
+      super(
+          "testSupervisorId",
+          taskStorage,
+          taskMaster,
+          indexerMetadataStorageCoordinator,
+          taskClientFactory,
+          objectMapper,
+          spec,
+          rowIngestionMetersFactory,
+          false
+      );
+    }
+
+    @Override
+    protected String baseTaskName()
+    {
+      return "test";
+    }
+
+    @Override
+    protected void updateLatestSequenceFromStream(
+        RecordSupplier<String, String> recordSupplier, Set<StreamPartition<String>> streamPartitions
+    )
+    {
+      // do nothing
+    }
+
+    @Override
+    protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
+        int groupId,
+        Map<String, String> startPartitions,
+        Map<String, String> endPartitions,
+        String baseSequenceName,
+        DateTime minimumMessageTime,
+        DateTime maximumMessageTime,
+        Set<String> exclusiveStartSequenceNumberPartitions,
+        SeekableStreamSupervisorIOConfig ioConfig
+    )
+    {
+      return new SeekableStreamIndexTaskIOConfig<String, String>(
+          groupId,
+          baseSequenceName,
+          new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
+          new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+          true,
+          minimumMessageTime,
+          maximumMessageTime
+      )
+      {
+      };
+    }
+
+    @Override
+    protected List<SeekableStreamIndexTask<String, String>> createIndexTasks(
+        int replicas,
+        String baseSequenceName,
+        ObjectMapper sortingMapper,
+        TreeMap<Integer, Map<String, String>> sequenceOffsets,
+        SeekableStreamIndexTaskIOConfig taskIoConfig,
+        SeekableStreamIndexTaskTuningConfig taskTuningConfig,
+        RowIngestionMetersFactory rowIngestionMetersFactory
+    )
+    {
+      return ImmutableList.of(new TestSeekableStreamIndexTask(
+          "id",
+          null,
+          getDataSchema(),
+          taskTuningConfig,
+          taskIoConfig,
+          null,
+          null,
+          null,
+          rowIngestionMetersFactory,
+          null
+      ));
+    }
+
+    @Override
+    protected int getTaskGroupIdForPartition(String partition)
+    {
+      return 0;
+    }
+
+    @Override
+    protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
+    {
+      return true;
+    }
+
+    @Override
+    protected boolean doesTaskTypeMatchSupervisor(Task task)
+    {
+      return true;
+    }
+
+    @Override
+    protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
+        String stream,
+        Map<String, String> map
+    )
+    {
+      return null;
+    }
+
+    @Override
+    protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive)
+    {
+      return new OrderedSequenceNumber<String>(seq, isExclusive)
+      {
+        @Override
+        public int compareTo(OrderedSequenceNumber<String> o)
+        {
+          return new BigInteger(this.get()).compareTo(new BigInteger(o.get()));
+        }
+      };
+    }
+
+    @Override
+    protected void scheduleReporting(ScheduledExecutorService reportingExec)
+    {
+      // do nothing
+    }
+
+    @Override
+    protected Map<String, String> getLagPerPartition(Map<String, String> currentOffsets)
+    {
+      return null;
+    }
+
+    @Override
+    protected RecordSupplier<String, String> setupRecordSupplier()
+    {
+      return recordSupplier;
+    }
+
+    @Override
+    protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(
+        int numPartitions,
+        boolean includeOffsets
+    )
+    {
+      return new SeekableStreamSupervisorReportPayload<String, String>(
+          DATASOURCE,
+          STREAM,
+          1,
+          1,
+          1L,
+          null,
+          null,
+          null,
+          null,
+          false,
+          true,
+          null,
+          null,
+          null
+      )
+      {
+      };
+    }
+
+    @Override
+    protected String getNotSetMarker()
+    {
+      return "NOT_SET";
+    }
+
+    @Override
+    protected String getEndOfPartitionMarker()
+    {
+      return "EOF";
+    }
+
+    @Override
+    protected boolean isEndOfShard(String seqNum)
+    {
+      return false;
+    }
+
+    @Override
+    protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
+    {
+      return false;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 03c2a43..cf3f4d5 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -44,6 +44,12 @@ public interface Supervisor
     return ImmutableMap.of();
   }
 
+  @Nullable
+  default Boolean isHealthy()
+  {
+    return null; // default implementation for interface compatability; returning null since true or false is misleading
+  }
+
   void reset(DataSourceMetadata dataSourceMetadata);
 
   /**
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorModule.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorModule.java
new file mode 100644
index 0000000..d281f55
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorModule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.indexing.overlord.supervisor;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import org.apache.druid.guice.JsonConfigProvider;
+
+public class SupervisorModule implements Module
+{
+  @Override
+  public void configure(Binder binder)
+  {
+    JsonConfigProvider.bind(binder, "druid.supervisor", SupervisorStateManagerConfig.class);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
new file mode 100644
index 0000000..76cf8c6
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.java.util.common.DateTimes;
+import org.joda.time.DateTime;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+public class SupervisorStateManager
+{
+  public interface State
+  {
+    /**
+     * If we are in this state, is the supervisor healthy or unhealthy?
+     */
+    boolean isHealthy();
+
+    /**
+     * It may be helpful to provide more detailed state information (e.g. CONNECTING_TO_STREAM, CREATING_TASKS, etc.)
+     * during the first run of the supervisor so that if the supervisor is unable to complete the run, we have
+     * information about what stage it was in when it failed. Once the supervisor is stable, we may not be as concerned
+     * about all the stages it cycles through, and just want to know if it's healthy or unhealthy. This flag indicates
+     * if the state should only be accepted prior to having completed a successful run.
+     */
+    boolean isFirstRunOnly();
+
+    default State getBasicState()
+    {
+      return this;
+    }
+  }
+
+  public enum BasicState implements State
+  {
+    UNHEALTHY_SUPERVISOR(false, false),
+    UNHEALTHY_TASKS(false, false),
+
+    PENDING(true, true),
+    RUNNING(true, false),
+    SUSPENDED(true, false),
+    STOPPING(true, false);
+
+    private final boolean healthy;
+    private final boolean firstRunOnly;
+
+    BasicState(boolean healthy, boolean firstRunOnly)
+    {
+      this.healthy = healthy;
+      this.firstRunOnly = firstRunOnly;
+    }
+
+    @Override
+    public boolean isHealthy()
+    {
+      return healthy;
+    }
+
+    @Override
+    public boolean isFirstRunOnly()
+    {
+      return firstRunOnly;
+    }
+  }
+
+  private final SupervisorStateManagerConfig supervisorStateManagerConfig;
+  private final State healthySteadyState;
+
+  private final Deque<ExceptionEvent> recentEventsQueue = new ConcurrentLinkedDeque<>();
+
+  private State supervisorState = BasicState.PENDING;
+
+  private boolean atLeastOneSuccessfulRun = false;
+  private boolean currentRunSuccessful = true;
+
+  // Used to determine if a low consecutiveSuccessfulRuns/consecutiveSuccessfulTasks means that the supervisor is
+  // recovering from an unhealthy state, or if the supervisor just started and hasn't run many times yet.
+  private boolean hasHitUnhealthinessThreshold = false;
+  private boolean hasHitTaskUnhealthinessThreshold = false;
+
+  private int consecutiveFailedRuns = 0;
+  private int consecutiveSuccessfulRuns = 0;
+  private int consecutiveFailedTasks = 0;
+  private int consecutiveSuccessfulTasks = 0;
+
+  public SupervisorStateManager(SupervisorStateManagerConfig supervisorStateManagerConfig, boolean suspended)
+  {
+    Preconditions.checkArgument(supervisorStateManagerConfig.getMaxStoredExceptionEvents() >= Math.max(
+        supervisorStateManagerConfig.getHealthinessThreshold(),
+        supervisorStateManagerConfig.getUnhealthinessThreshold()
+    ), "maxStoredExceptionEvents must be >= to max(healthinessThreshold, unhealthinessThreshold)");
+
+    this.supervisorStateManagerConfig = supervisorStateManagerConfig;
+    this.healthySteadyState = suspended ? BasicState.SUSPENDED : BasicState.RUNNING;
+  }
+
+  /**
+   * Certain states are only valid if the supervisor hasn't had a successful iteration. This method checks if there's
+   * been at least one successful iteration, and if applicable sets supervisor state to an appropriate new state.
+   */
+  public void maybeSetState(State proposedState)
+  {
+    // if we're over our unhealthiness threshold, set the state to the appropriate unhealthy state
+    if (consecutiveFailedRuns >= supervisorStateManagerConfig.getUnhealthinessThreshold()) {
+      hasHitUnhealthinessThreshold = true;
+      supervisorState = getSpecificUnhealthySupervisorState();
+      return;
+    }
+
+    // if we're over our task unhealthiness threshold, set the state to UNHEALTHY_TASKS
+    if (consecutiveFailedTasks >= supervisorStateManagerConfig.getTaskUnhealthinessThreshold()) {
+      hasHitTaskUnhealthinessThreshold = true;
+      supervisorState = BasicState.UNHEALTHY_TASKS;
+      return;
+    }
+
+    // if we're currently in an unhealthy state and are below our healthiness threshold for either runs and tasks,
+    // ignore the proposed state; the healthiness threshold only applies if we've had a failure in the past
+    if (!this.supervisorState.isHealthy()
+        && ((hasHitUnhealthinessThreshold
+             && consecutiveSuccessfulRuns < supervisorStateManagerConfig.getHealthinessThreshold())
+            || (hasHitTaskUnhealthinessThreshold
+                && consecutiveSuccessfulTasks < supervisorStateManagerConfig.getTaskHealthinessThreshold()))) {
+      return;
+    }
+
+    // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) but haven't had a successful run
+    // yet, refuse to switch and prefer the more specific states used for first run (CONNECTING_TO_STREAM,
+    // DISCOVERING_INITIAL_TASKS, CREATING_TASKS, etc.)
+    if (healthySteadyState.equals(proposedState) && !atLeastOneSuccessfulRun) {
+      return;
+    }
+
+    // accept the state if it is not a firstRunOnly state OR we are still on the first run
+    if (!proposedState.isFirstRunOnly() || !atLeastOneSuccessfulRun) {
+      supervisorState = proposedState;
+    }
+  }
+
+  public void recordThrowableEvent(Throwable t)
+  {
+    recentEventsQueue.add(buildExceptionEvent(t));
+
+    if (recentEventsQueue.size() > supervisorStateManagerConfig.getMaxStoredExceptionEvents()) {
+      recentEventsQueue.poll();
+    }
+
+    currentRunSuccessful = false;
+  }
+
+  public void recordCompletedTaskState(TaskState state)
+  {
+    if (state.isSuccess()) {
+      consecutiveSuccessfulTasks++;
+      consecutiveFailedTasks = 0;
+    } else if (state.isFailure()) {
+      consecutiveFailedTasks++;
+      consecutiveSuccessfulTasks = 0;
+    }
+  }
+
+  public void markRunFinished()
+  {
+    atLeastOneSuccessfulRun |= currentRunSuccessful;
+
+    consecutiveSuccessfulRuns = currentRunSuccessful ? consecutiveSuccessfulRuns + 1 : 0;
+    consecutiveFailedRuns = currentRunSuccessful ? 0 : consecutiveFailedRuns + 1;
+
+    // Try to set the state to RUNNING or SUSPENDED. This will be rejected if we haven't had atLeastOneSuccessfulRun
+    // (in favor of the more specific states for the initial run) and will instead trigger setting the state to an
+    // unhealthy one if we are now over the error thresholds.
+    maybeSetState(healthySteadyState);
+
+    // reset for next run
+    currentRunSuccessful = true;
+  }
+
+  public List<ExceptionEvent> getExceptionEvents()
+  {
+    return new ArrayList<>(recentEventsQueue);
+  }
+
+  public State getSupervisorState()
+  {
+    return supervisorState;
+  }
+
+  public boolean isHealthy()
+  {
+    return supervisorState != null && supervisorState.isHealthy();
+  }
+
+  public boolean isAtLeastOneSuccessfulRun()
+  {
+    return atLeastOneSuccessfulRun;
+  }
+
+  protected Deque<ExceptionEvent> getRecentEventsQueue()
+  {
+    return recentEventsQueue;
+  }
+
+  protected boolean isStoreStackTrace()
+  {
+    return supervisorStateManagerConfig.isStoreStackTrace();
+  }
+
+  protected State getSpecificUnhealthySupervisorState()
+  {
+    return BasicState.UNHEALTHY_SUPERVISOR;
+  }
+
+  protected ExceptionEvent buildExceptionEvent(Throwable t)
+  {
+    return new ExceptionEvent(t, isStoreStackTrace());
+  }
+
+  public static class ExceptionEvent
+  {
+    private final DateTime timestamp;
+    private final String exceptionClass;
+    private final String message; // contains full stackTrace if storeStackTrace is true
+
+    public ExceptionEvent(Throwable t, boolean storeStackTrace)
+    {
+      this.timestamp = DateTimes.nowUtc();
+      this.exceptionClass = getMeaningfulExceptionClass(t);
+      this.message = storeStackTrace ? ExceptionUtils.getStackTrace(t) : t.getMessage();
+    }
+
+    @JsonProperty
+    public DateTime getTimestamp()
+    {
+      return timestamp;
+    }
+
+    @JsonProperty
+    public String getExceptionClass()
+    {
+      return exceptionClass;
+    }
+
+    @JsonProperty
+    public String getMessage()
+    {
+      return message;
+    }
+
+    protected boolean shouldSkipException(String className)
+    {
+      return RuntimeException.class.getName().equals(className);
+    }
+
+    private String getMeaningfulExceptionClass(Throwable t)
+    {
+      return ((List<Throwable>) ExceptionUtils.getThrowableList(t))
+          .stream()
+          .map(x -> x.getClass().getName())
+          .filter(x -> !shouldSkipException(x))
+          .findFirst()
+          .orElse(Exception.class.getName());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
new file mode 100644
index 0000000..5dde7a3
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class SupervisorStateManagerConfig
+{
+  @JsonProperty
+  private boolean storeStackTrace = false;
+
+  // The number of failed runs before the supervisor is considered unhealthy
+  @JsonProperty
+  private int unhealthinessThreshold = 3;
+
+  // The number of successful runs before an unhealthy supervisor is again considered healthy
+  @JsonProperty
+  private int healthinessThreshold = 3;
+
+  // The number of consecutive task failures before the supervisor is considered unhealthy
+  @JsonProperty
+  private int taskUnhealthinessThreshold = 3;
+
+  // The number of consecutive task successes before an unhealthy supervisor is again considered healthy
+  @JsonProperty
+  private int taskHealthinessThreshold = 3;
+
+  // The maximum number of exception events that can be returned through the supervisor status endpoint
+  @JsonProperty
+  private int maxStoredExceptionEvents = Math.max(unhealthinessThreshold, healthinessThreshold);
+
+  public SupervisorStateManagerConfig()
+  {
+
+  }
+
+  public SupervisorStateManagerConfig(int maxStoredExceptionEvents)
+  {
+    this.maxStoredExceptionEvents = maxStoredExceptionEvents;
+  }
+
+  public boolean isStoreStackTrace()
+  {
+    return storeStackTrace;
+  }
+
+  public int getUnhealthinessThreshold()
+  {
+    return unhealthinessThreshold;
+  }
+
+  public int getHealthinessThreshold()
+  {
+    return healthinessThreshold;
+  }
+
+  public int getTaskUnhealthinessThreshold()
+  {
+    return taskUnhealthinessThreshold;
+  }
+
+  public int getTaskHealthinessThreshold()
+  {
+    return taskHealthinessThreshold;
+  }
+
+  public int getMaxStoredExceptionEvents()
+  {
+    return maxStoredExceptionEvents;
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 375c013..bfabeed 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -80,7 +80,7 @@ import java.util.stream.Stream;
 public abstract class BaseAppenderatorDriver implements Closeable
 {
   /**
-   * Segments allocated for an intervval.
+   * Segments allocated for an interval.
    * There should be at most a single active (appending) segment at any time.
    */
   static class SegmentsOfInterval
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
index d513aeb..82a67b0 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.server.coordination;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.utils.CircularBuffer;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
+ *
  */
 public class ChangeRequestHistoryTest
 {
@@ -43,7 +43,8 @@ public class ChangeRequestHistoryTest
     history.addChangeRequest(new SegmentChangeRequestNoop());
     Assert.assertEquals(1, history.getLastCounter().getCounter());
 
-    ChangeRequestsSnapshot<DataSegmentChangeRequest> snapshot = history.getRequestsSince(ChangeRequestHistory.Counter.ZERO).get();
+    ChangeRequestsSnapshot<DataSegmentChangeRequest> snapshot = history.getRequestsSince(ChangeRequestHistory.Counter.ZERO)
+                                                                       .get();
     Assert.assertEquals(1, snapshot.getRequests().size());
     Assert.assertEquals(1, snapshot.getCounter().getCounter());
 
@@ -170,57 +171,4 @@ public class ChangeRequestHistoryTest
     Assert.assertEquals(1, snapshot.getCounter().getCounter());
     Assert.assertEquals(1, snapshot.getRequests().size());
   }
-
-  @Test
-  public void testCircularBuffer()
-  {
-    CircularBuffer<Integer> circularBuffer = new CircularBuffer<>(
-        3);
-
-    circularBuffer.add(1);
-    Assert.assertEquals(1, circularBuffer.size());
-    Assert.assertEquals(1, (int) circularBuffer.get(0));
-
-    circularBuffer.add(2);
-    Assert.assertEquals(2, circularBuffer.size());
-    for (int i = 0; i < circularBuffer.size(); i++) {
-      Assert.assertEquals(i + 1, (int) circularBuffer.get(i));
-    }
-
-    circularBuffer.add(3);
-    Assert.assertEquals(3, circularBuffer.size());
-    for (int i = 0; i < circularBuffer.size(); i++) {
-      Assert.assertEquals(i + 1, (int) circularBuffer.get(i));
-    }
-
-    circularBuffer.add(4);
-    Assert.assertEquals(3, circularBuffer.size());
-    for (int i = 0; i < circularBuffer.size(); i++) {
-      Assert.assertEquals(i + 2, (int) circularBuffer.get(i));
-    }
-
-    circularBuffer.add(5);
-    Assert.assertEquals(3, circularBuffer.size());
-    for (int i = 0; i < circularBuffer.size(); i++) {
-      Assert.assertEquals(i + 3, (int) circularBuffer.get(i));
-    }
-
-    circularBuffer.add(6);
-    Assert.assertEquals(3, circularBuffer.size());
-    for (int i = 0; i < circularBuffer.size(); i++) {
-      Assert.assertEquals(i + 4, (int) circularBuffer.get(i));
-    }
-
-    circularBuffer.add(7);
-    Assert.assertEquals(3, circularBuffer.size());
-    for (int i = 0; i < circularBuffer.size(); i++) {
-      Assert.assertEquals(i + 5, (int) circularBuffer.get(i));
-    }
-
-    circularBuffer.add(8);
-    Assert.assertEquals(3, circularBuffer.size());
-    for (int i = 0; i < circularBuffer.size(); i++) {
-      Assert.assertEquals(i + 6, (int) circularBuffer.get(i));
-    }
-  }
 }
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index c11031f..bd1c451 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -90,6 +90,7 @@ import org.apache.druid.indexing.overlord.http.OverlordResource;
 import org.apache.druid.indexing.overlord.sampler.SamplerModule;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorModule;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -198,7 +199,9 @@ public class CliOverlord extends ServerRunnable
             binder.bind(SupervisorManager.class).in(LazySingleton.class);
 
             binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
-            binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
+            binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>()
+            {
+            })
                   .toProvider(Providers.of(null));
             binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
 
@@ -334,6 +337,7 @@ public class CliOverlord extends ServerRunnable
         },
         new IndexingServiceFirehoseModule(),
         new IndexingServiceTaskLogsModule(),
+        new SupervisorModule(),
         new LookupSerdeModule(),
         new SamplerModule()
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org