You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2021/12/03 17:14:52 UTC

[druid] branch master updated: Enhancements to IndexTaskClient. (#12011)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e0e05aa  Enhancements to IndexTaskClient. (#12011)
e0e05aa is described below

commit e0e05aad995d82d40fb54b960120d6c7d1050932
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Fri Dec 3 09:14:32 2021 -0800

    Enhancements to IndexTaskClient. (#12011)
    
    * Enhancements to IndexTaskClient.
    
    1) Ability to use handlers other than StringFullResponseHandler. This
       functionality is not used in production code yet, but is useful
       because it will allow tasks to communicate with each other in
       non-string-based formats and in streaming fashion. In the future,
       we'll be able to use this to make task-to-task communication
       more efficient.
    
    2) Truncate server errors at 1KB, so long errors do not pollute logs.
    
    3) Change error log level for retryable errors from WARN to INFO. (The
       final error is still WARN.)
    
    4) Harmonize log and exception messages to have a more consistent format.
    
    * Additional tests and improvements.
---
 .../org/apache/druid/java/util/common/Either.java  | 146 +++++++++++++++
 .../client/response/BytesFullResponseHandler.java  |   2 +-
 .../client/response/BytesFullResponseHolder.java   |   5 +-
 .../http/client/response/FullResponseHolder.java   |   6 +-
 .../response/InputStreamFullResponseHandler.java   |   2 +-
 .../response/InputStreamFullResponseHolder.java    |   8 +-
 .../response/ObjectOrErrorResponseHandler.java     | 180 ++++++++++++++++++
 .../client/response/StringFullResponseHandler.java |   2 +-
 .../client/response/StringFullResponseHolder.java  |   4 +-
 .../java/org/apache/druid/common/EitherTest.java   | 111 +++++++++++
 .../response/ObjectOrErrorResponseHandlerTest.java | 121 ++++++++++++
 .../indexing/kafka/KafkaIndexTaskClientTest.java   | 165 ++++++++--------
 .../kinesis/KinesisIndexTaskClientTest.java        | 165 ++++++++--------
 .../druid/indexing/common/IndexTaskClient.java     |  86 ++++++---
 .../druid/indexing/common/IndexTaskClientTest.java | 207 ++++++++++++++++++++-
 .../common/actions/RemoteTaskActionClientTest.java |   4 +-
 .../indexing/HttpIndexingServiceClientTest.java    |  10 +-
 .../query/lookup/LookupReferencesManagerTest.java  |  42 ++---
 .../coordinator/duty/CompactSegmentsTest.java      |   1 -
 .../druid/sql/calcite/schema/SystemSchemaTest.java |   7 +-
 20 files changed, 1030 insertions(+), 244 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/common/Either.java b/core/src/main/java/org/apache/druid/java/util/common/Either.java
new file mode 100644
index 0000000..85fc625
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/common/Either.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * Encapsulates either an "error" or a "value".
+ *
+ * Similar to the Either class in Scala or Haskell, except the possibilities are named "error" and "value" instead of
+ * "left" and "right".
+ */
+public class Either<L, R>
+{
+  private final L error;
+  private final R value;
+
+  private Either(L error, R value)
+  {
+    this.error = error;
+    this.value = value;
+  }
+
+  public static <L, R> Either<L, R> error(final L error)
+  {
+    return new Either<>(Preconditions.checkNotNull(error, "error"), null);
+  }
+
+  public static <L, R> Either<L, R> value(@Nullable final R value)
+  {
+    return new Either<>(null, value);
+  }
+
+  public boolean isValue()
+  {
+    return error == null;
+  }
+
+  public boolean isError()
+  {
+    return error != null;
+  }
+
+  public L error()
+  {
+    if (isError()) {
+      return error;
+    } else {
+      throw new IllegalStateException("Not an error; check isError first");
+    }
+  }
+
+  /**
+   * If this Either represents a value, returns it. If this Either represents an error, throw an error.
+   *
+   * If the error is a {@link RuntimeException} or {@link Error}, it is thrown directly. If it is some other
+   * {@link Throwable}, it is wrapped in a RuntimeException and thrown. If it is not a throwable at all, a generic
+   * error is thrown containing the string representation of the error object.
+   *
+   * If you want to be able to retrieve the error as-is, use {@link #isError()} and {@link #error()} instead.
+   */
+  @Nullable
+  public R valueOrThrow()
+  {
+    if (isValue()) {
+      return value;
+    } else if (error instanceof Throwable) {
+      Throwables.propagateIfPossible((Throwable) error);
+      throw new RuntimeException((Throwable) error);
+    } else {
+      throw new RuntimeException(error.toString());
+    }
+  }
+
+  /**
+   * Applies a function to this value, if present.
+   *
+   * If the mapping function throws an exception, it is thrown by this method instead of being packed up into
+   * the returned Either.
+   *
+   * If this Either represents an error, the mapping function is not applied.
+   *
+   * @throws NullPointerException if the mapping function returns null
+   */
+  public <T> Either<L, T> map(final Function<R, T> fn)
+  {
+    if (isValue()) {
+      return Either.value(fn.apply(value));
+    } else {
+      // Safe because the value is never going to be returned.
+      //noinspection unchecked
+      return (Either<L, T>) this;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Either<?, ?> either = (Either<?, ?>) o;
+    return Objects.equals(error, either.error) && Objects.equals(value, either.value);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(error, value);
+  }
+
+  @Override
+  public String toString()
+  {
+    if (isValue()) {
+      return "Value[" + value + "]";
+    } else {
+      return "Error[" + error + "]";
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHandler.java
index 2e9c404..665f496 100644
--- a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHandler.java
+++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHandler.java
@@ -32,7 +32,7 @@ public class BytesFullResponseHandler implements HttpResponseHandler<BytesFullRe
   @Override
   public ClientResponse<BytesFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
   {
-    BytesFullResponseHolder holder = new BytesFullResponseHolder(response.getStatus(), response);
+    BytesFullResponseHolder holder = new BytesFullResponseHolder(response);
 
     holder.addChunk(getContentBytes(response.getContent()));
 
diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java
index 8f6f233..b428031 100644
--- a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java
+++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java
@@ -20,7 +20,6 @@
 package org.apache.druid.java.util.http.client.response;
 
 import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -30,9 +29,9 @@ public class BytesFullResponseHolder extends FullResponseHolder<byte[]>
 {
   private final List<byte[]> chunks;
 
-  public BytesFullResponseHolder(HttpResponseStatus status, HttpResponse response)
+  public BytesFullResponseHolder(HttpResponse response)
   {
-    super(status, response);
+    super(response);
     this.chunks = new ArrayList<>();
   }
 
diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java
index fbbab87..27c2ed2 100644
--- a/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java
+++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java
@@ -29,18 +29,16 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
  */
 public abstract class FullResponseHolder<T>
 {
-  private final HttpResponseStatus status;
   private final HttpResponse response;
 
-  public FullResponseHolder(HttpResponseStatus status, HttpResponse response)
+  public FullResponseHolder(HttpResponse response)
   {
-    this.status = status;
     this.response = response;
   }
 
   public HttpResponseStatus getStatus()
   {
-    return status;
+    return response.getStatus();
   }
 
   public HttpResponse getResponse()
diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java
index 01a69a8..71f1a85 100644
--- a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java
+++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java
@@ -32,7 +32,7 @@ public class InputStreamFullResponseHandler implements HttpResponseHandler<Input
   @Override
   public ClientResponse<InputStreamFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
   {
-    InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response.getStatus(), response);
+    InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response);
     holder.addChunk(getContentBytes(response.getContent()));
     return ClientResponse.finished(holder);
   }
diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java
index fbabe63..2660908 100644
--- a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java
+++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java
@@ -21,7 +21,6 @@ package org.apache.druid.java.util.http.client.response;
 
 import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
 import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
 import java.io.InputStream;
 
@@ -29,12 +28,9 @@ public class InputStreamFullResponseHolder extends FullResponseHolder<InputStrea
 {
   private final AppendableByteArrayInputStream is;
 
-  public InputStreamFullResponseHolder(
-      HttpResponseStatus status,
-      HttpResponse response
-  )
+  public InputStreamFullResponseHolder(HttpResponse response)
   {
-    super(status, response);
+    super(response);
     is = new AppendableByteArrayInputStream();
   }
 
diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandler.java
new file mode 100644
index 0000000..c884e7f
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandler.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.http.client.response;
+
+import org.apache.druid.java.util.common.Either;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Response handler that delegates successful responses (2xx response codes) to some other handler, but returns
+ * errors (non-2xx response codes) as Strings. The return value is an {@link Either}.
+ */
+public class ObjectOrErrorResponseHandler<IntermediateType, FinalType>
+    implements HttpResponseHandler<Either<StringFullResponseHolder, IntermediateType>, Either<StringFullResponseHolder, FinalType>>
+{
+  private final HttpResponseHandler<IntermediateType, FinalType> okHandler;
+  private final StringFullResponseHandler errorHandler;
+
+  public ObjectOrErrorResponseHandler(HttpResponseHandler<IntermediateType, FinalType> okHandler)
+  {
+    this.okHandler = okHandler;
+    this.errorHandler = new StringFullResponseHandler(StandardCharsets.UTF_8);
+  }
+
+  @Override
+  public ClientResponse<Either<StringFullResponseHolder, IntermediateType>> handleResponse(
+      final HttpResponse response,
+      final TrafficCop trafficCop
+  )
+  {
+    if (response.getStatus().getCode() / 100 == 2) {
+      final ClientResponse<IntermediateType> delegateResponse = okHandler.handleResponse(response, trafficCop);
+
+      return new ClientResponse<>(
+          delegateResponse.isFinished(),
+          delegateResponse.isContinueReading(),
+          Either.value(delegateResponse.getObj())
+      );
+    } else {
+      final ClientResponse<StringFullResponseHolder> delegateResponse =
+          errorHandler.handleResponse(response, trafficCop);
+
+      return new ClientResponse<>(
+          delegateResponse.isFinished(),
+          delegateResponse.isContinueReading(),
+          Either.error(delegateResponse.getObj())
+      );
+    }
+  }
+
+  @Override
+  public ClientResponse<Either<StringFullResponseHolder, IntermediateType>> handleChunk(
+      final ClientResponse<Either<StringFullResponseHolder, IntermediateType>> clientResponse,
+      final HttpChunk chunk,
+      final long chunkNum
+  )
+  {
+    final Either<StringFullResponseHolder, IntermediateType> prevHolder = clientResponse.getObj();
+
+    if (prevHolder.isValue()) {
+      final ClientResponse<IntermediateType> delegateResponse = okHandler.handleChunk(
+          new ClientResponse<>(
+              clientResponse.isFinished(),
+              clientResponse.isContinueReading(),
+              prevHolder.valueOrThrow()
+          ),
+          chunk,
+          chunkNum
+      );
+
+      return new ClientResponse<>(
+          delegateResponse.isFinished(),
+          delegateResponse.isContinueReading(),
+          Either.value(delegateResponse.getObj())
+      );
+    } else {
+      final ClientResponse<StringFullResponseHolder> delegateResponse = errorHandler.handleChunk(
+          new ClientResponse<>(
+              clientResponse.isFinished(),
+              clientResponse.isContinueReading(),
+              prevHolder.error()
+          ),
+          chunk,
+          chunkNum
+      );
+
+      return new ClientResponse<>(
+          delegateResponse.isFinished(),
+          delegateResponse.isContinueReading(),
+          Either.error(delegateResponse.getObj())
+      );
+    }
+  }
+
+  @Override
+  public ClientResponse<Either<StringFullResponseHolder, FinalType>> done(
+      final ClientResponse<Either<StringFullResponseHolder, IntermediateType>> clientResponse
+  )
+  {
+    final Either<StringFullResponseHolder, IntermediateType> prevHolder = clientResponse.getObj();
+
+    if (prevHolder.isValue()) {
+      final ClientResponse<FinalType> delegateResponse = okHandler.done(
+          new ClientResponse<>(
+              clientResponse.isFinished(),
+              clientResponse.isContinueReading(),
+              prevHolder.valueOrThrow()
+          )
+      );
+
+      return new ClientResponse<>(
+          delegateResponse.isFinished(),
+          delegateResponse.isContinueReading(),
+          Either.value(delegateResponse.getObj())
+      );
+    } else {
+      final ClientResponse<StringFullResponseHolder> delegateResponse = errorHandler.done(
+          new ClientResponse<>(
+              clientResponse.isFinished(),
+              clientResponse.isContinueReading(),
+              prevHolder.error()
+          )
+      );
+
+      return new ClientResponse<>(
+          delegateResponse.isFinished(),
+          delegateResponse.isContinueReading(),
+          Either.error(delegateResponse.getObj())
+      );
+    }
+  }
+
+  @Override
+  public void exceptionCaught(
+      final ClientResponse<Either<StringFullResponseHolder, IntermediateType>> clientResponse,
+      final Throwable e
+  )
+  {
+    final Either<StringFullResponseHolder, IntermediateType> prevHolder = clientResponse.getObj();
+
+    if (prevHolder.isValue()) {
+      okHandler.exceptionCaught(
+          new ClientResponse<>(
+              clientResponse.isFinished(),
+              clientResponse.isContinueReading(),
+              prevHolder.valueOrThrow()
+          ),
+          e
+      );
+    } else {
+      errorHandler.exceptionCaught(
+          new ClientResponse<>(
+              clientResponse.isFinished(),
+              clientResponse.isContinueReading(),
+              prevHolder.error()
+          ),
+          e
+      );
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHandler.java
index f4176e8..57af8e1 100644
--- a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHandler.java
+++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHandler.java
@@ -41,7 +41,7 @@ public class StringFullResponseHandler
   @Override
   public ClientResponse<StringFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
   {
-    return ClientResponse.unfinished(new StringFullResponseHolder(response.getStatus(), response, charset));
+    return ClientResponse.unfinished(new StringFullResponseHolder(response, charset));
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java
index 3fe2e08..457c6e2 100644
--- a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java
+++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java
@@ -20,7 +20,6 @@
 package org.apache.druid.java.util.http.client.response;
 
 import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
 import java.nio.charset.Charset;
 
@@ -29,12 +28,11 @@ public class StringFullResponseHolder extends FullResponseHolder<String>
   private final StringBuilder builder;
 
   public StringFullResponseHolder(
-      HttpResponseStatus status,
       HttpResponse response,
       Charset charset
   )
   {
-    super(status, response);
+    super(response);
     this.builder = new StringBuilder(response.getContent().toString(charset));
   }
 
diff --git a/core/src/test/java/org/apache/druid/common/EitherTest.java b/core/src/test/java/org/apache/druid/common/EitherTest.java
new file mode 100644
index 0000000..a91908e
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/common/EitherTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.common;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.StringUtils;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class EitherTest
+{
+  @Test
+  public void testValueString()
+  {
+    final Either<String, String> either = Either.value("yay");
+
+    Assert.assertFalse(either.isError());
+    Assert.assertTrue(either.isValue());
+    Assert.assertEquals("yay", either.valueOrThrow());
+
+    final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, either::error);
+    MatcherAssert.assertThat(e.getMessage(), CoreMatchers.startsWith("Not an error"));
+
+    // Test toString.
+    Assert.assertEquals("Value[yay]", either.toString());
+
+    // Test map.
+    Assert.assertEquals(Either.value("YAY"), either.map(StringUtils::toUpperCase));
+  }
+
+  @Test
+  public void testValueNull()
+  {
+    final Either<String, String> either = Either.value(null);
+
+    Assert.assertFalse(either.isError());
+    Assert.assertTrue(either.isValue());
+    Assert.assertNull(either.valueOrThrow());
+
+    final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, either::error);
+    MatcherAssert.assertThat(e.getMessage(), CoreMatchers.startsWith("Not an error"));
+
+    // Test toString.
+    Assert.assertEquals("Value[null]", either.toString());
+
+    // Test map.
+    Assert.assertEquals(Either.value("nullxyz"), either.map(s -> s + "xyz"));
+  }
+
+  @Test
+  public void testErrorString()
+  {
+    final Either<String, Object> either = Either.error("oh no");
+
+    Assert.assertTrue(either.isError());
+    Assert.assertFalse(either.isValue());
+    Assert.assertEquals("oh no", either.error());
+
+    final RuntimeException e = Assert.assertThrows(RuntimeException.class, either::valueOrThrow);
+    MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no"));
+
+    // Test toString.
+    Assert.assertEquals("Error[oh no]", either.toString());
+
+    // Test map.
+    Assert.assertEquals(either, either.map(o -> "this does nothing because the Either is an error"));
+  }
+
+  @Test
+  public void testErrorThrowable()
+  {
+    final Either<Throwable, Object> either = Either.error(new AssertionError("oh no"));
+
+    Assert.assertTrue(either.isError());
+    Assert.assertFalse(either.isValue());
+    MatcherAssert.assertThat(either.error(), CoreMatchers.instanceOf(AssertionError.class));
+    MatcherAssert.assertThat(either.error().getMessage(), CoreMatchers.equalTo("oh no"));
+
+    final AssertionError e = Assert.assertThrows(AssertionError.class, either::valueOrThrow);
+    MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no"));
+
+    // Test toString.
+    Assert.assertEquals("Error[java.lang.AssertionError: oh no]", either.toString());
+  }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(Either.class).usingGetClass().verify();
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandlerTest.java b/core/src/test/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandlerTest.java
new file mode 100644
index 0000000..4e2247c
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandlerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.http.client.response;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.StringUtils;
+import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
+import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class ObjectOrErrorResponseHandlerTest
+{
+  @Test
+  public void testOk() throws Exception
+  {
+    HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+    response.setChunked(false);
+    response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));
+
+    final ObjectOrErrorResponseHandler<InputStreamFullResponseHolder, InputStreamFullResponseHolder> responseHandler =
+        new ObjectOrErrorResponseHandler<>(new InputStreamFullResponseHandler());
+
+    ClientResponse<Either<StringFullResponseHolder, InputStreamFullResponseHolder>> clientResp =
+        responseHandler.handleResponse(response, null);
+
+    DefaultHttpChunk chunk =
+        new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING)));
+    clientResp = responseHandler.handleChunk(clientResp, chunk, 0);
+    clientResp = responseHandler.done(clientResp);
+
+    Assert.assertTrue(clientResp.isFinished());
+    Assert.assertEquals(
+        "abcdefg",
+        IOUtils.toString(clientResp.getObj().valueOrThrow().getContent(), StandardCharsets.UTF_8)
+    );
+  }
+
+  @Test
+  public void testExceptionAfterOk() throws Exception
+  {
+    HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+    response.setChunked(false);
+    response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));
+
+    final ObjectOrErrorResponseHandler<InputStreamFullResponseHolder, InputStreamFullResponseHolder> responseHandler =
+        new ObjectOrErrorResponseHandler<>(new InputStreamFullResponseHandler());
+
+    ClientResponse<Either<StringFullResponseHolder, InputStreamFullResponseHolder>> clientResp =
+        responseHandler.handleResponse(response, null);
+
+    Exception ex = new RuntimeException("dummy!");
+    responseHandler.exceptionCaught(clientResp, ex);
+
+    // Exception after HTTP OK still is handled by the "OK handler"
+    // (The handler that starts the request gets to finish it.)
+    Assert.assertTrue(clientResp.isFinished());
+    Assert.assertTrue(clientResp.getObj().isValue());
+
+    final InputStream responseStream = clientResp.getObj().valueOrThrow().getContent();
+    final IOException e = Assert.assertThrows(
+        IOException.class,
+        () -> IOUtils.toString(responseStream, StandardCharsets.UTF_8)
+    );
+    Assert.assertEquals("java.lang.RuntimeException: dummy!", e.getMessage());
+  }
+
+  @Test
+  public void testServerError() throws Exception
+  {
+    HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+    response.setChunked(false);
+    response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));
+
+    final ObjectOrErrorResponseHandler<InputStreamFullResponseHolder, InputStreamFullResponseHolder> responseHandler =
+        new ObjectOrErrorResponseHandler<>(new InputStreamFullResponseHandler());
+
+    ClientResponse<Either<StringFullResponseHolder, InputStreamFullResponseHolder>> clientResp =
+        responseHandler.handleResponse(response, null);
+
+    DefaultHttpChunk chunk =
+        new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING)));
+    clientResp = responseHandler.handleChunk(clientResp, chunk, 0);
+    clientResp = responseHandler.done(clientResp);
+
+    // 5xx HTTP code is handled by the error handler.
+    Assert.assertTrue(clientResp.isFinished());
+    Assert.assertTrue(clientResp.getObj().isError());
+    Assert.assertEquals(
+        HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode(),
+        clientResp.getObj().error().getResponse().getStatus().getCode()
+    );
+    Assert.assertEquals("abcdefg", clientResp.getObj().error().getContent());
+  }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java
index db26a9b..3646b4a 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java
@@ -33,16 +33,17 @@ import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler;
 import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
+import org.hamcrest.CoreMatchers;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -182,18 +183,19 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
   public void testInternalServerError()
   {
     expectedException.expect(RuntimeException.class);
-    expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []");
+    expectedException.expectCause(CoreMatchers.instanceOf(IOException.class));
+    expectedException.expectMessage("Received server error with status [500 Internal Server Error]");
 
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2);
     EasyMock.expect(responseHolder.getContent()).andReturn("");
     EasyMock.expect(
         httpClient.go(
             EasyMock.anyObject(Request.class),
-            EasyMock.anyObject(StringFullResponseHandler.class),
+            EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
             EasyMock.eq(TEST_HTTP_TIMEOUT)
         )
     ).andReturn(
-        Futures.immediateFuture(responseHolder)
+        errorResponseHolder()
     );
     replayAll();
 
@@ -204,19 +206,19 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
   @Test
   public void testBadRequest()
   {
-    expectedException.expect(IAE.class);
-    expectedException.expectMessage("Received 400 Bad Request with body:");
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Received server error with status [400 Bad Request]");
 
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2);
     EasyMock.expect(responseHolder.getContent()).andReturn("");
     EasyMock.expect(
         httpClient.go(
             EasyMock.anyObject(Request.class),
-            EasyMock.anyObject(StringFullResponseHandler.class),
+            EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
             EasyMock.eq(TEST_HTTP_TIMEOUT)
         )
     ).andReturn(
-        Futures.immediateFuture(responseHolder)
+        errorResponseHolder()
     );
     replayAll();
 
@@ -227,22 +229,22 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
   @Test
   public void testTaskLocationMismatch()
   {
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3)
-            .andReturn(HttpResponseStatus.OK);
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2);
     EasyMock.expect(responseHolder.getResponse()).andReturn(response);
-    EasyMock.expect(responseHolder.getContent()).andReturn("").times(2)
-            .andReturn("{}");
+    EasyMock.expect(responseHolder.getContent()).andReturn("").andReturn("{}");
     EasyMock.expect(response.headers()).andReturn(headers);
     EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id");
     EasyMock.expect(
         httpClient.go(
             EasyMock.anyObject(Request.class),
-            EasyMock.anyObject(StringFullResponseHandler.class),
+            EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
             EasyMock.eq(TEST_HTTP_TIMEOUT)
         )
     ).andReturn(
-        Futures.immediateFuture(responseHolder)
-    ).times(2);
+        errorResponseHolder()
+    ).andReturn(
+        okResponseHolder()
+    );
     replayAll();
 
     Map<Integer, Long> results = client.getCurrentOffsets(TEST_ID, true);
@@ -255,14 +257,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
   public void testGetCurrentOffsets() throws Exception
   {
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -288,9 +289,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     client = new TestableKafkaIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 3);
 
     Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6)
-            .andReturn(HttpResponseStatus.OK).times(1);
-    EasyMock.expect(responseHolder.getContent()).andReturn("").times(4)
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(4);
+    EasyMock.expect(responseHolder.getContent()).andReturn("").times(2)
             .andReturn("{\"0\":1, \"1\":10}");
     EasyMock.expect(responseHolder.getResponse()).andReturn(response).times(2);
     EasyMock.expect(response.headers()).andReturn(headers).times(2);
@@ -298,11 +298,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
 
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
-    ).times(3);
+        errorResponseHolder()
+    ).times(2).andReturn(
+        okResponseHolder()
+    );
 
     replayAll();
 
@@ -328,7 +330,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
   public void testGetCurrentOffsetsWithExhaustedRetries()
   {
     expectedException.expect(RuntimeException.class);
-    expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [404]");
+    expectedException.expectCause(CoreMatchers.instanceOf(IOException.class));
+    expectedException.expectMessage("Received server error with status [404 Not Found]");
 
     client = new TestableKafkaIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 2);
 
@@ -341,10 +344,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(
         httpClient.go(
             EasyMock.anyObject(Request.class),
-            EasyMock.anyObject(StringFullResponseHandler.class),
+            EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
             EasyMock.eq(TEST_HTTP_TIMEOUT)
         )
-    ).andReturn(Futures.immediateFuture(responseHolder)).anyTimes();
+    ).andReturn(errorResponseHolder()).anyTimes();
     replayAll();
 
     client.getCurrentOffsets(TEST_ID, true);
@@ -355,14 +358,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
   public void testGetEndOffsets() throws Exception
   {
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -389,19 +391,16 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     DateTime now = DateTimes.nowUtc();
 
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3)
-            .andReturn(HttpResponseStatus.OK);
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2);
     EasyMock.expect(responseHolder.getResponse()).andReturn(response);
     EasyMock.expect(response.headers()).andReturn(headers);
     EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn(null);
     EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
-    )).andReturn(
-        Futures.immediateFuture(responseHolder)
-    ).times(2);
+    )).andReturn(errorResponseHolder()).once().andReturn(okResponseHolder());
     replayAll();
 
     DateTime results = client.getStartTime(TEST_ID);
@@ -424,14 +423,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     Status status = Status.READING;
 
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
     EasyMock.expect(responseHolder.getContent()).andReturn(StringUtils.format("\"%s\"", status.toString())).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -453,14 +451,14 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
   public void testPause() throws Exception
   {
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -487,30 +485,29 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     Capture<Request> captured2 = Capture.newInstance();
     Capture<Request> captured3 = Capture.newInstance();
     // one time in IndexTaskClient.submitRequest() and another in KafkaIndexTaskClient.pause()
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED).times(2)
-            .andReturn(HttpResponseStatus.OK).anyTimes();
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED);
     EasyMock.expect(responseHolder.getContent()).andReturn("\"PAUSED\"").times(2)
             .andReturn("{\"0\":1, \"1\":10}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured2),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured3),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
 
     replayAll();
@@ -552,10 +549,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -580,10 +577,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -609,10 +606,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -636,10 +633,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -662,10 +659,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -689,10 +686,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -725,10 +722,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -762,10 +759,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -799,10 +796,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn("\"READING\"").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -837,10 +834,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -874,10 +871,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -911,10 +908,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -948,10 +945,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -991,10 +988,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -1029,6 +1026,16 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     }
   }
 
+  private ListenableFuture<Either> okResponseHolder()
+  {
+    return Futures.immediateFuture(Either.value(responseHolder));
+  }
+
+  private ListenableFuture<Either> errorResponseHolder()
+  {
+    return Futures.immediateFuture(Either.error(responseHolder));
+  }
+
   private class TestableKafkaIndexTaskClient extends KafkaIndexTaskClient
   {
     TestableKafkaIndexTaskClient(
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java
index 7be086e..1463b7a 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java
@@ -33,16 +33,17 @@ import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler;
 import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
+import org.hamcrest.CoreMatchers;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -183,18 +184,19 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
   public void testInternalServerError()
   {
     expectedException.expect(RuntimeException.class);
-    expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []");
+    expectedException.expectCause(CoreMatchers.instanceOf(IOException.class));
+    expectedException.expectMessage("Received server error with status [500 Internal Server Error]");
 
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2);
     EasyMock.expect(responseHolder.getContent()).andReturn("");
     EasyMock.expect(
         httpClient.go(
             EasyMock.anyObject(Request.class),
-            EasyMock.anyObject(StringFullResponseHandler.class),
+            EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
             EasyMock.eq(TEST_HTTP_TIMEOUT)
         )
     ).andReturn(
-        Futures.immediateFuture(responseHolder)
+        errorResponseHolder()
     );
     replayAll();
 
@@ -205,19 +207,19 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
   @Test
   public void testBadRequest()
   {
-    expectedException.expect(IAE.class);
-    expectedException.expectMessage("Received 400 Bad Request with body:");
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Received server error with status [400 Bad Request]");
 
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2);
     EasyMock.expect(responseHolder.getContent()).andReturn("");
     EasyMock.expect(
         httpClient.go(
             EasyMock.anyObject(Request.class),
-            EasyMock.anyObject(StringFullResponseHandler.class),
+            EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
             EasyMock.eq(TEST_HTTP_TIMEOUT)
         )
     ).andReturn(
-        Futures.immediateFuture(responseHolder)
+        errorResponseHolder()
     );
     replayAll();
 
@@ -228,22 +230,22 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
   @Test
   public void testTaskLocationMismatch()
   {
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3)
-            .andReturn(HttpResponseStatus.OK);
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2);
     EasyMock.expect(responseHolder.getResponse()).andReturn(response);
-    EasyMock.expect(responseHolder.getContent()).andReturn("").times(2)
-            .andReturn("{}");
+    EasyMock.expect(responseHolder.getContent()).andReturn("").andReturn("{}");
     EasyMock.expect(response.headers()).andReturn(headers);
     EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id");
     EasyMock.expect(
         httpClient.go(
             EasyMock.anyObject(Request.class),
-            EasyMock.anyObject(StringFullResponseHandler.class),
+            EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
             EasyMock.eq(TEST_HTTP_TIMEOUT)
         )
     ).andReturn(
-        Futures.immediateFuture(responseHolder)
-    ).times(2);
+        errorResponseHolder()
+    ).andReturn(
+        okResponseHolder()
+    );
     replayAll();
 
     Map<String, String> results = client.getCurrentOffsets(TEST_ID, true);
@@ -256,14 +258,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
   public void testGetCurrentOffsets() throws Exception
   {
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -289,9 +290,8 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     client = new TestableKinesisIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 3);
 
     Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6)
-            .andReturn(HttpResponseStatus.OK).times(1);
-    EasyMock.expect(responseHolder.getContent()).andReturn("").times(4)
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(4);
+    EasyMock.expect(responseHolder.getContent()).andReturn("").times(2)
             .andReturn("{\"0\":1, \"1\":10}");
     EasyMock.expect(responseHolder.getResponse()).andReturn(response).times(2);
     EasyMock.expect(response.headers()).andReturn(headers).times(2);
@@ -299,11 +299,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
 
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
-    ).times(3);
+        errorResponseHolder()
+    ).times(2).andReturn(
+        okResponseHolder()
+    );
 
     replayAll();
 
@@ -329,7 +331,8 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
   public void testGetCurrentOffsetsWithExhaustedRetries()
   {
     expectedException.expect(RuntimeException.class);
-    expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [404]");
+    expectedException.expectCause(CoreMatchers.instanceOf(IOException.class));
+    expectedException.expectMessage("Received server error with status [404 Not Found]");
 
     client = new TestableKinesisIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 2);
 
@@ -342,10 +345,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(
         httpClient.go(
             EasyMock.anyObject(Request.class),
-            EasyMock.anyObject(StringFullResponseHandler.class),
+            EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
             EasyMock.eq(TEST_HTTP_TIMEOUT)
         )
-    ).andReturn(Futures.immediateFuture(responseHolder)).anyTimes();
+    ).andReturn(errorResponseHolder()).anyTimes();
     replayAll();
 
     client.getCurrentOffsets(TEST_ID, true);
@@ -356,14 +359,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
   public void testGetEndOffsets() throws Exception
   {
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -390,19 +392,16 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     DateTime now = DateTimes.nowUtc();
 
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3)
-            .andReturn(HttpResponseStatus.OK);
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2);
     EasyMock.expect(responseHolder.getResponse()).andReturn(response);
     EasyMock.expect(response.headers()).andReturn(headers);
     EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn(null);
     EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
-    )).andReturn(
-        Futures.immediateFuture(responseHolder)
-    ).times(2);
+    )).andReturn(errorResponseHolder()).once().andReturn(okResponseHolder());
     replayAll();
 
     DateTime results = client.getStartTime(TEST_ID);
@@ -425,14 +424,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     Status status = Status.READING;
 
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
     EasyMock.expect(responseHolder.getContent()).andReturn(StringUtils.format("\"%s\"", status.toString())).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -454,14 +452,14 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
   public void testPause() throws Exception
   {
     Capture<Request> captured = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -487,30 +485,29 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     Capture<Request> captured = Capture.newInstance();
     Capture<Request> captured2 = Capture.newInstance();
     Capture<Request> captured3 = Capture.newInstance();
-    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED).times(2)
-            .andReturn(HttpResponseStatus.OK).anyTimes();
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED);
     EasyMock.expect(responseHolder.getContent()).andReturn("\"PAUSED\"").times(2)
             .andReturn("{\"0\":1, \"1\":10}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured2),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured3),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
 
     replayAll();
@@ -552,10 +549,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -580,10 +577,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -609,10 +606,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -636,10 +633,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -662,10 +659,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     );
     replayAll();
 
@@ -689,10 +686,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -725,10 +722,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -762,10 +759,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -799,10 +796,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn("\"READING\"").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -837,10 +834,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -874,10 +871,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -911,10 +908,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -948,10 +945,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -991,10 +988,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(httpClient.go(
         EasyMock.capture(captured),
-        EasyMock.anyObject(StringFullResponseHandler.class),
+        EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
         EasyMock.eq(TEST_HTTP_TIMEOUT)
     )).andReturn(
-        Futures.immediateFuture(responseHolder)
+        okResponseHolder()
     ).times(numRequests);
     replayAll();
 
@@ -1029,6 +1026,16 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
     }
   }
 
+  private ListenableFuture<Either> okResponseHolder()
+  {
+    return Futures.immediateFuture(Either.value(responseHolder));
+  }
+
+  private ListenableFuture<Either> errorResponseHolder()
+  {
+    return Futures.immediateFuture(Either.error(responseHolder));
+  }
+
   private class TestableKinesisIndexTaskClient extends KinesisIndexTaskClient
   {
     TestableKinesisIndexTaskClient(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
index 27d7759..db1692a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
@@ -28,12 +28,14 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -41,6 +43,8 @@ import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler;
 import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
 import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
@@ -217,7 +221,16 @@ public abstract class IndexTaskClient implements AutoCloseable
       boolean retry
   ) throws IOException, ChannelException, NoTaskLocationException
   {
-    return submitRequest(taskId, null, method, encodedPathSuffix, encodedQueryString, new byte[0], retry);
+    return submitRequest(
+        taskId,
+        null,
+        method,
+        encodedPathSuffix,
+        encodedQueryString,
+        new byte[0],
+        new StringFullResponseHandler(StandardCharsets.UTF_8),
+        retry
+    );
   }
 
   /**
@@ -239,6 +252,7 @@ public abstract class IndexTaskClient implements AutoCloseable
         encodedPathSuffix,
         encodedQueryString,
         content,
+        new StringFullResponseHandler(StandardCharsets.UTF_8),
         retry
     );
   }
@@ -262,6 +276,7 @@ public abstract class IndexTaskClient implements AutoCloseable
         encodedPathSuffix,
         encodedQueryString,
         content,
+        new StringFullResponseHandler(StandardCharsets.UTF_8),
         retry
     );
   }
@@ -293,13 +308,14 @@ public abstract class IndexTaskClient implements AutoCloseable
   /**
    * Sends an HTTP request to the task of the specified {@code taskId} and returns a response if it succeeded.
    */
-  private StringFullResponseHolder submitRequest(
+  protected <IntermediateType, FinalType> FinalType submitRequest(
       String taskId,
       @Nullable String mediaType, // nullable if content is empty
       HttpMethod method,
       String encodedPathSuffix,
       @Nullable String encodedQueryString,
       byte[] content,
+      HttpResponseHandler<IntermediateType, FinalType> responseHandler,
       boolean retry
   ) throws IOException, ChannelException, NoTaskLocationException
   {
@@ -333,21 +349,38 @@ public abstract class IndexTaskClient implements AutoCloseable
           content
       );
 
-      StringFullResponseHolder response = null;
+      Either<StringFullResponseHolder, FinalType> response = null;
       try {
         // Netty throws some annoying exceptions if a connection can't be opened, which happens relatively frequently
         // for tasks that happen to still be starting up, so test the connection first to keep the logs clean.
         checkConnection(request.getUrl().getHost(), request.getUrl().getPort());
 
-        response = submitRequest(request);
+        response = submitRequest(request, responseHandler);
 
-        int responseCode = response.getStatus().getCode();
-        if (responseCode / 100 == 2) {
-          return response;
-        } else if (responseCode == 400) { // don't bother retrying if it's a bad request
-          throw new IAE("Received 400 Bad Request with body: %s", response.getContent());
+        if (response.isValue()) {
+          return response.valueOrThrow();
         } else {
-          throw new IOE("Received status [%d] and content [%s]", responseCode, response.getContent());
+          final StringBuilder exceptionMessage = new StringBuilder();
+          final HttpResponseStatus httpResponseStatus = response.error().getStatus();
+          final String httpResponseContent = response.error().getContent();
+          exceptionMessage.append("Received server error with status [").append(httpResponseStatus).append("]");
+
+          if (!Strings.isNullOrEmpty(httpResponseContent)) {
+            final String choppedMessage =
+                StringUtils.chop(
+                    StringUtils.nullToEmptyNonDruidDataString(httpResponseContent),
+                    1000
+                );
+
+            exceptionMessage.append("; first 1KB of body: ").append(choppedMessage);
+          }
+
+          if (httpResponseStatus.getCode() == 400) {
+            // don't bother retrying if it's a bad request
+            throw new IAE(exceptionMessage.toString());
+          } else {
+            throw new IOE(exceptionMessage.toString());
+          }
         }
       }
       catch (IOException | ChannelException e) {
@@ -360,9 +393,10 @@ public abstract class IndexTaskClient implements AutoCloseable
         // eventually be updated.
 
         final Duration delay;
-        if (response != null && response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+        if (response != null && !response.isValue()
+            && response.error().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
           String headerId = StringUtils.urlDecode(
-              response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER)
+              response.error().getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER)
           );
           if (headerId != null && !headerId.equals(taskId)) {
             log.warn(
@@ -381,22 +415,24 @@ public abstract class IndexTaskClient implements AutoCloseable
         final String urlForLog = request.getUrl().toString();
         if (!retry) {
           // if retry=false, we probably aren't too concerned if the operation doesn't succeed (i.e. the request was
-          // for informational purposes only) so don't log a scary stack trace
-          log.info("submitRequest failed for [%s], with message [%s]", urlForLog, e.getMessage());
+          // for informational purposes only); log at INFO instead of WARN.
+          log.noStackTrace().info(e, "submitRequest failed for [%s]", urlForLog);
           throw e;
         } else if (delay == null) {
-          log.warn(e, "Retries exhausted for [%s], last exception:", urlForLog);
+          // When retrying, log the final failure at WARN level, since it is likely to be bad news.
+          log.warn(e, "submitRequest failed for [%s]", urlForLog);
           throw e;
         } else {
           try {
             final long sleepTime = delay.getMillis();
-            log.warn(
-                "Bad response HTTP [%s] from [%s]; will try again in [%s] (body/exception: [%s])",
-                (response != null ? response.getStatus().getCode() : "no response"),
+            // When retrying, log non-final failures at INFO level.
+            log.noStackTrace().info(
+                e,
+                "submitRequest failed for [%s]; will try again in [%s]",
                 urlForLog,
-                new Duration(sleepTime).toString(),
-                (response != null ? response.getContent() : e.getMessage())
+                new Duration(sleepTime).toString()
             );
+
             Thread.sleep(sleepTime);
           }
           catch (InterruptedException e2) {
@@ -421,11 +457,17 @@ public abstract class IndexTaskClient implements AutoCloseable
     }
   }
 
-  private StringFullResponseHolder submitRequest(Request request) throws IOException, ChannelException
+  private <IntermediateType, FinalType> Either<StringFullResponseHolder, FinalType> submitRequest(
+      Request request,
+      HttpResponseHandler<IntermediateType, FinalType> responseHandler
+  ) throws IOException, ChannelException
   {
+    final ObjectOrErrorResponseHandler<IntermediateType, FinalType> wrappedHandler =
+        new ObjectOrErrorResponseHandler<>(responseHandler);
+
     try {
       log.debug("HTTP %s: %s", request.getMethod().getName(), request.getUrl().toString());
-      return httpClient.go(request, new StringFullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
+      return httpClient.go(request, wrappedHandler, httpTimeout).get();
     }
     catch (Exception e) {
       throw throwIfPossible(e);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java
index b41ad94..128fdb3 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java
@@ -25,8 +25,10 @@ import com.google.common.util.concurrent.Futures;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
 import org.easymock.EasyMock;
 import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
@@ -82,10 +84,11 @@ public class IndexTaskClientTest
     EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
             .andReturn(
                 Futures.immediateFuture(
-                    new StringFullResponseHolder(
-                        HttpResponseStatus.OK,
-                        new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK),
-                        StandardCharsets.UTF_8
+                    Either.value(
+                        new StringFullResponseHolder(
+                            new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK),
+                            StandardCharsets.UTF_8
+                        )
                     )
                 )
             )
@@ -101,9 +104,201 @@ public class IndexTaskClientTest
       );
       Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
     }
-  } 
+  }
+
+  @Test
+  public void retryOnServerError() throws IOException
+  {
+    final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
+            .andReturn(
+                Futures.immediateFuture(
+                    Either.error(
+                        new StringFullResponseHolder(
+                            new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR),
+                            StandardCharsets.UTF_8
+                        ).addChunk("Error")
+                    )
+                )
+            )
+            .times(2);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
+            .andReturn(
+                Futures.immediateFuture(
+                    Either.value(
+                        new StringFullResponseHolder(
+                            new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK),
+                            StandardCharsets.UTF_8
+                        )
+                    )
+                )
+            )
+            .once();
+    EasyMock.replay(httpClient);
+    try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
+      final StringFullResponseHolder response = indexTaskClient.submitRequestWithEmptyContent(
+          "taskId",
+          HttpMethod.GET,
+          "test",
+          null,
+          true
+      );
+      Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
+    }
+    EasyMock.verify(httpClient);
+  }
+
+  @Test
+  public void retryIfNotFoundWithIncorrectTaskId() throws IOException
+  {
+    final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+    final String taskId = "taskId";
+    final String incorrectTaskId = "incorrectTaskId";
+    final DefaultHttpResponse incorrectResponse =
+        new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
+    incorrectResponse.headers().add(ChatHandlerResource.TASK_ID_HEADER, incorrectTaskId);
+    final DefaultHttpResponse correctResponse =
+        new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+    correctResponse.headers().add(ChatHandlerResource.TASK_ID_HEADER, taskId);
+
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
+            .andReturn(
+                Futures.immediateFuture(
+                    Either.error(
+                        new StringFullResponseHolder(
+                            incorrectResponse,
+                            StandardCharsets.UTF_8
+                        )
+                    )
+                )
+            )
+            .times(2);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
+            .andReturn(
+                Futures.immediateFuture(
+                    Either.value(
+                        new StringFullResponseHolder(
+                            correctResponse,
+                            StandardCharsets.UTF_8
+                        )
+                    )
+                )
+            )
+            .once();
+    EasyMock.replay(httpClient);
+    try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
+      final StringFullResponseHolder response = indexTaskClient.submitRequestWithEmptyContent(
+          taskId,
+          HttpMethod.GET,
+          "test",
+          null,
+          true
+      );
+      Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
+    }
+    EasyMock.verify(httpClient);
+  }
+
+  @Test
+  public void dontRetryOnBadRequest()
+  {
+    final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
+            .andReturn(
+                Futures.immediateFuture(
+                    Either.error(
+                        new StringFullResponseHolder(
+                            new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST),
+                            StandardCharsets.UTF_8
+                        ).addChunk("Error")
+                    )
+                )
+            )
+            .times(1);
+    EasyMock.replay(httpClient);
+    try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
+      final IllegalArgumentException e = Assert.assertThrows(
+          IllegalArgumentException.class,
+          () -> indexTaskClient.submitRequestWithEmptyContent("taskId", HttpMethod.GET, "test", null, true)
+      );
+
+      Assert.assertEquals(
+          "Received server error with status [400 Bad Request]; first 1KB of body: Error",
+          e.getMessage()
+      );
+    }
+
+    EasyMock.verify(httpClient);
+  }
+
+  @Test
+  public void dontRetryIfRetryFalse()
+  {
+    final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
+            .andReturn(
+                Futures.immediateFuture(
+                    Either.error(
+                        new StringFullResponseHolder(
+                            new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR),
+                            StandardCharsets.UTF_8
+                        ).addChunk("Error")
+                    )
+                )
+            )
+            .times(1);
+    EasyMock.replay(httpClient);
+    try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
+      final IOException e = Assert.assertThrows(
+          IOException.class,
+          () -> indexTaskClient.submitRequestWithEmptyContent("taskId", HttpMethod.GET, "test", null, false)
+      );
+
+      Assert.assertEquals(
+          "Received server error with status [500 Internal Server Error]; first 1KB of body: Error",
+          e.getMessage()
+      );
+    }
+
+    EasyMock.verify(httpClient);
+  }
+
+  @Test
+  public void dontRetryIfNotFoundWithCorrectTaskId()
+  {
+    final String taskId = "taskId";
+    final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+    final DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
+    response.headers().add(ChatHandlerResource.TASK_ID_HEADER, taskId);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
+            .andReturn(
+                Futures.immediateFuture(
+                    Either.error(
+                        new StringFullResponseHolder(response, StandardCharsets.UTF_8).addChunk("Error")
+                    )
+                )
+            )
+            .times(1);
+    EasyMock.replay(httpClient);
+    try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
+      final IOException e = Assert.assertThrows(
+          IOException.class,
+          () -> indexTaskClient.submitRequestWithEmptyContent(taskId, HttpMethod.GET, "test", null, false)
+      );
+
+      Assert.assertEquals(
+          "Received server error with status [404 Not Found]; first 1KB of body: Error",
+          e.getMessage()
+      );
+    }
+
+    EasyMock.verify(httpClient);
+  }
 
-  private IndexTaskClient buildIndexTaskClient(HttpClient httpClient, Function<String, TaskLocation> taskLocationProvider)
+  private IndexTaskClient buildIndexTaskClient(
+      HttpClient httpClient,
+      Function<String, TaskLocation> taskLocationProvider
+  )
   {
     final TaskInfoProvider taskInfoProvider = new TaskInfoProvider()
     {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
index 5d6b557..c3528c7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
@@ -85,10 +85,10 @@ public class RemoteTaskActionClientTest
     responseBody.put("result", expectedLocks);
     String strResult = objectMapper.writeValueAsString(responseBody);
     final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class);
+    EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
     EasyMock.replay(response);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
         response,
         StandardCharsets.UTF_8
     ).addChunk(strResult);
@@ -120,10 +120,10 @@ public class RemoteTaskActionClientTest
 
     // return status code 200 and a list with size equals 1
     final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class);
+    EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).anyTimes();
     EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
     EasyMock.replay(response);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.BAD_REQUEST,
         response,
         StandardCharsets.UTF_8
     ).addChunk("testSubmitWithIllegalStatusCode");
diff --git a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
index 36caf90..b2fed85 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
@@ -89,11 +89,11 @@ public class HttpIndexingServiceClientTest
     };
 
     HttpResponse response = EasyMock.createMock(HttpResponse.class);
+    EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
     EasyMock.replay(response);
 
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
         response,
         StandardCharsets.UTF_8
     ).addChunk(jsonMapper.writeValueAsString(samplerResponse));
@@ -142,11 +142,11 @@ public class HttpIndexingServiceClientTest
       }
     };
     HttpResponse response = EasyMock.createMock(HttpResponse.class);
+    EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).anyTimes();
     EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
     EasyMock.replay(response);
 
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.INTERNAL_SERVER_ERROR,
         response,
         StandardCharsets.UTF_8
     ).addChunk("");
@@ -170,13 +170,13 @@ public class HttpIndexingServiceClientTest
   {
     String taskId = "testTaskId";
     HttpResponse response = EasyMock.createMock(HttpResponse.class);
+    EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
     EasyMock.replay(response);
 
     Map<String, Object> dummyResponse = ImmutableMap.of("test", "value");
 
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
         response,
         StandardCharsets.UTF_8
     ).addChunk(jsonMapper.writeValueAsString(dummyResponse));
@@ -209,11 +209,11 @@ public class HttpIndexingServiceClientTest
     ChannelBuffer buf = ChannelBuffers.buffer(errorMsg.length());
     buf.writeBytes(errorMsg.getBytes(StandardCharsets.UTF_8));
 
+    EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).anyTimes();
     EasyMock.expect(response.getContent()).andReturn(buf);
     EasyMock.replay(response);
 
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.NOT_FOUND,
         response,
         StandardCharsets.UTF_8
     ).addChunk("");
@@ -241,11 +241,11 @@ public class HttpIndexingServiceClientTest
   {
     String taskId = "testTaskId";
     HttpResponse response = EasyMock.createMock(HttpResponse.class);
+    EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
     EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
     EasyMock.replay(response);
 
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
         response,
         StandardCharsets.UTF_8
     ).addChunk("");
diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
index 3b953da..8cc6956 100644
--- a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
+++ b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java
@@ -86,9 +86,10 @@ public class LookupReferencesManagerTest
     );
   }
 
-  private static HttpResponse newEmptyResponse()
+  private static HttpResponse newEmptyResponse(final HttpResponseStatus status)
   {
     final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class);
+    EasyMock.expect(response.getStatus()).andReturn(status).anyTimes();
     EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
     EasyMock.replay(response);
     return response;
@@ -114,8 +115,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -178,8 +178,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -219,8 +218,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -253,8 +251,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -284,8 +281,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -317,8 +313,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -354,8 +349,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -385,8 +379,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -417,8 +410,7 @@ public class LookupReferencesManagerTest
         druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")
     ).andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -479,8 +471,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -524,8 +515,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -613,8 +603,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@@ -725,8 +714,7 @@ public class LookupReferencesManagerTest
     ))
             .andReturn(request);
     StringFullResponseHolder responseHolder = new StringFullResponseHolder(
-        HttpResponseStatus.OK,
-        newEmptyResponse(),
+        newEmptyResponse(HttpResponseStatus.OK),
         StandardCharsets.UTF_8
     ).addChunk(strResult);
     EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 7defa0d..68b9eb2 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -1726,7 +1726,6 @@ public class CompactSegmentsTest
     {
       final HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
       final StringFullResponseHolder holder = new StringFullResponseHolder(
-          HttpResponseStatus.OK,
           httpResponse,
           StandardCharsets.UTF_8
       );
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 8f5d110..9ce4852 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -1108,7 +1108,7 @@ public class SystemSchemaTest extends CalciteTestBase
 
 
     HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-    InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp);
+    InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp);
 
     EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once();
     EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
@@ -1282,7 +1282,7 @@ public class SystemSchemaTest extends CalciteTestBase
             .anyTimes();
 
     HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-    InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp);
+    InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp);
 
     EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once();
 
@@ -1397,8 +1397,7 @@ public class SystemSchemaTest extends CalciteTestBase
       String json
   )
   {
-    InputStreamFullResponseHolder responseHolder =
-        new InputStreamFullResponseHolder(httpResponse.getStatus(), httpResponse);
+    InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResponse);
 
     byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
     responseHolder.addChunk(bytesToWrite);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org