You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by sa...@apache.org on 2017/10/04 20:07:30 UTC

aurora git commit: Convert Webhook to AbstractIdleService, use async HTTP client

Repository: aurora
Updated Branches:
  refs/heads/master 4e7cdc422 -> 0f1e68401


Convert Webhook to AbstractIdleService, use async HTTP client

Hijacking https://reviews.apache.org/r/59703

>From the above review: "Current code uses a synchronous HTTP client, which can block the EventBus. Switch to an async HTTP client."

Previously, we had an issue where the HTTP client would have a non-daemon thread which caused the Scheduler to fail to shutdown. I converted it into an AbstractIdleService and properly closed the client in the shutdown() method. Additionally, I made a small tweak to the original code where we ABORT any response receieved after the status since we don't care. We just use the response code for stats.

Testing Done:
./gradlew test

Tested proper shutdown occurs in Vagrant.

Scale tested up to 2000 TASK_LOST events with the registered endpoint waiting 5-10 minutes to response -- does not seem to block scheduling.

Bugs closed: AURORA-1773

Reviewed at https://reviews.apache.org/r/62700/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0f1e6840
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0f1e6840
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0f1e6840

Branch: refs/heads/master
Commit: 0f1e68401da64a9aa2fd507010d21b7d7d8ebf53
Parents: 4e7cdc4
Author: Jordan Ly <jo...@gmail.com>
Authored: Wed Oct 4 13:07:19 2017 -0700
Committer: Santhosh Kumar <ss...@twitter.com>
Committed: Wed Oct 4 13:07:19 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |  10 +-
 .../apache/aurora/scheduler/events/Webhook.java | 117 ++++--
 .../aurora/scheduler/events/WebhookInfo.java    |  14 +
 .../aurora/scheduler/events/WebhookModule.java  |  36 +-
 .../org/apache/aurora/scheduler/webhook.json    |   4 +-
 .../aurora/scheduler/events/WebhookTest.java    | 413 ++++++++++++-------
 6 files changed, 371 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 069c62e..d7a4287 100644
--- a/build.gradle
+++ b/build.gradle
@@ -82,14 +82,14 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
   ext.gsonRev = '2.3.1'
   ext.guavaRev = '20.0'
   ext.guiceRev = '3.0'
-  ext.httpclientRev = '4.5.2'
-  ext.httpcoreRev = '4.4.4'
+  ext.asyncHttpclientRev = '2.0.37'
   ext.jacksonRev = '2.5.1'
   ext.jerseyRev = '1.19'
   ext.jsrRev = '3.0.1'
   ext.junitRev = '4.12'
   ext.logbackRev = '1.2.3'
   ext.mybatisRev = '3.4.1'
+  ext.nettyRev = '4.0.52.Final'
   ext.protobufRev = '2.6.1'
   ext.servletRev = '3.1.0'
   ext.slf4jRev = '1.7.25'
@@ -110,13 +110,12 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
       // See http://forums.gradle.org/gradle/topics/shouldnt-resolutionstrategy-affect-depending-projects-transitive-dependencies
       resolutionStrategy {
         failOnVersionConflict()
-        force "org.apache.httpcomponents:httpclient:${httpclientRev}"
-        force "org.apache.httpcomponents:httpcore:${httpcoreRev}"
         force "com.fasterxml.jackson.core:jackson-annotations:${jacksonRev}"
         force "com.fasterxml.jackson.core:jackson-core:${jacksonRev}"
         force "com.google.code.gson:gson:${gsonRev}"
         force "com.google.guava:guava:${guavaRev}"
         force "com.google.protobuf:protobuf-java:${protobufRev}"
+        force "io.netty:netty-handler:${nettyRev}"
         force "junit:junit:${junitRev}"
         force "org.apache.thrift:libthrift:${thriftRev}"
         force "org.apache.zookeeper:zookeeper:${zookeeperRev}"
@@ -415,8 +414,7 @@ dependencies {
   compile "org.apache.curator:curator-framework:${curatorRev}"
   compile "org.apache.curator:curator-recipes:${curatorRev}"
   compile 'org.apache.mesos:mesos:1.2.0'
-  compile "org.apache.httpcomponents:httpclient:${httpclientRev}"
-  compile "org.apache.httpcomponents:httpcore:${httpcoreRev}"
+  compile "org.asynchttpclient:async-http-client:${asyncHttpclientRev}"
   compile "org.apache.shiro:shiro-guice:${shiroRev}"
   compile "org.apache.shiro:shiro-web:${shiroRev}"
   compile "org.apache.zookeeper:zookeeper:${zookeeperRev}"

http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
index 05f46a1..2af8118 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
@@ -13,59 +13,73 @@
  */
 package org.apache.aurora.scheduler.events;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.time.Instant;
+import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.eventbus.Subscribe;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.Inject;
 
+import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
+import org.asynchttpclient.AsyncCompletionHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.HttpResponseStatus;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.util.HttpConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Watches TaskStateChanges and send events to configured endpoint.
  */
-public class Webhook implements EventSubscriber {
+public class Webhook extends AbstractIdleService implements EventSubscriber {
+  @VisibleForTesting
+  static final String ATTEMPTS_STAT_NAME = "webhooks_attempts";
+  @VisibleForTesting
+  static final String SUCCESS_STAT_NAME = "webhooks_success";
+  @VisibleForTesting
+  static final String ERRORS_STAT_NAME = "webhooks_errors";
+  @VisibleForTesting
+  static final String USER_ERRORS_STAT_NAME = "webhooks_user_errors";
 
   private static final Logger LOG = LoggerFactory.getLogger(Webhook.class);
 
   private final WebhookInfo webhookInfo;
-  private final CloseableHttpClient httpClient;
+  private final AsyncHttpClient httpClient;
   private final Predicate<ScheduleStatus> isWhitelisted;
 
+  private final AtomicLong attemptsCounter;
+  private final AtomicLong successCounter;
+  private final AtomicLong errorsCounter;
+  private final AtomicLong userErrorsCounter;
+
   @Inject
-  Webhook(CloseableHttpClient httpClient, WebhookInfo webhookInfo) {
-    this.webhookInfo = webhookInfo;
-    this.httpClient = httpClient;
-    // A task status is whitelisted if: a) the whitelist is absent, or b) the task status is
-    // explicitly specified in the whitelist.
+  Webhook(AsyncHttpClient httpClient, WebhookInfo webhookInfo, StatsProvider statsProvider) {
+    this.webhookInfo = requireNonNull(webhookInfo);
+    this.httpClient = requireNonNull(httpClient);
+    this.attemptsCounter = statsProvider.makeCounter(ATTEMPTS_STAT_NAME);
+    this.successCounter = statsProvider.makeCounter(SUCCESS_STAT_NAME);
+    this.errorsCounter = statsProvider.makeCounter(ERRORS_STAT_NAME);
+    this.userErrorsCounter = statsProvider.makeCounter(USER_ERRORS_STAT_NAME);
     this.isWhitelisted = status -> !webhookInfo.getWhitelistedStatuses().isPresent()
         || webhookInfo.getWhitelistedStatuses().get().contains(status);
     LOG.info("Webhook enabled with info" + this.webhookInfo);
   }
 
-  private HttpPost createPostRequest(TaskStateChange stateChange)
-      throws UnsupportedEncodingException {
-    String eventJson = stateChange.toJson();
-    HttpPost post = new HttpPost();
-    post.setURI(webhookInfo.getTargetURI());
-    post.setHeader("Timestamp", Long.toString(Instant.now().toEpochMilli()));
-    post.setEntity(new StringEntity(eventJson));
-    webhookInfo.getHeaders().entrySet().forEach(
-        e -> post.setHeader(e.getKey(), e.getValue()));
-    return post;
+  private BoundRequestBuilder createRequest(TaskStateChange stateChange) {
+    return httpClient.preparePost(webhookInfo.getTargetURI().toString())
+        .setBody(stateChange.toJson())
+        .setSingleHeaders(webhookInfo.getHeaders())
+        .addHeader("Timestamp", Long.toString(Instant.now().toEpochMilli()));
   }
 
   /**
@@ -82,20 +96,49 @@ public class Webhook implements EventSubscriber {
     // resend the entire state. This check also ensures that only whitelisted statuses will be sent
     // to the configured endpoint.
     if (stateChange.getOldState().isPresent() && isWhitelisted.apply(stateChange.getNewState())) {
+      attemptsCounter.incrementAndGet();
       try {
-        HttpPost post = createPostRequest(stateChange);
-        // Using try-with-resources on closeable and following
-        // https://hc.apache.org/httpcomponents-client-4.5.x/quickstart.html to make sure stream is
-        // closed after we get back a response to not leak http connections.
-        try (CloseableHttpResponse httpResponse = httpClient.execute(post)) {
-          HttpEntity entity = httpResponse.getEntity();
-          EntityUtils.consumeQuietly(entity);
-        } catch (IOException exp) {
-          LOG.error("Error sending a Webhook event", exp);
-        }
-      } catch (UnsupportedEncodingException exp) {
-        LOG.error("HttpPost exception when creating an HTTP Post request", exp);
+        // We don't care about the response body, so only listen for the HTTP status code.
+        createRequest(stateChange).execute(new AsyncCompletionHandler<Integer>() {
+          @Override
+          public void onThrowable(Throwable t) {
+            errorsCounter.incrementAndGet();
+            LOG.error("Error sending a Webhook event", t);
+          }
+
+          @Override
+          public State onStatusReceived(HttpResponseStatus status) throws Exception {
+            if (status.getStatusCode() == HttpConstants.ResponseStatusCodes.OK_200) {
+              successCounter.incrementAndGet();
+            } else {
+              userErrorsCounter.incrementAndGet();
+            }
+
+            // Abort after we get the status because that is all we use for processing.
+            return State.ABORT;
+          }
+
+          @Override
+          public Integer onCompleted(Response response) throws Exception {
+            // We do not care about the full response.
+            return 0;
+          }
+        });
+      } catch (Exception e) {
+        LOG.error("Error making Webhook request", e);
+        errorsCounter.incrementAndGet();
       }
     }
   }
+
+  @Override
+  protected void startUp() throws Exception {
+    // No-op
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Shutting down async Webhook client.");
+    httpClient.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
index da22c21..44789d6 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
@@ -129,6 +129,20 @@ public class WebhookInfo {
       return this;
     }
 
+    /**
+     * This method will add the supplied headers to the current headers.
+     *
+     * @param values The headers to add.
+     * @return The modified builder.
+     */
+    public WebhookInfoBuilder setHeaders(Map<String, String> values) {
+      for (Map.Entry<String, String> entry : values.entrySet()) {
+        setHeader(entry.getKey(), entry.getValue());
+      }
+
+      return this;
+    }
+
     public WebhookInfoBuilder setTargetURL(String targetURL) {
       this.targetURL = targetURL;
       return this;

http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java b/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
index 1f10af7..8c9ea05 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
@@ -29,17 +29,18 @@ import org.apache.aurora.common.args.Arg;
 import org.apache.aurora.common.args.CmdLine;
 import org.apache.aurora.common.args.constraints.CanRead;
 import org.apache.aurora.common.args.constraints.Exists;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.conn.ConnectionKeepAliveStrategy;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
-import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import static org.asynchttpclient.Dsl.asyncHttpClient;
+
 /**
  * Binding module for webhook management.
  */
@@ -70,24 +71,23 @@ public class WebhookModule extends AbstractModule {
   protected void configure() {
     if (enableWebhook) {
       WebhookInfo webhookInfo = parseWebhookConfig(readWebhookFile());
-      int timeout = webhookInfo.getConnectonTimeoutMsec();
-      RequestConfig config = RequestConfig.custom()
-          .setConnectTimeout(timeout) // establish connection with server eg time to TCP handshake.
-          .setConnectionRequestTimeout(timeout)  // get a connection from internal pool.
-          .setSocketTimeout(timeout) // wait for data after connection was established.
+      DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder()
+          .setConnectTimeout(webhookInfo.getConnectonTimeoutMsec())
+          .setHandshakeTimeout(webhookInfo.getConnectonTimeoutMsec())
+          .setSslSessionTimeout(webhookInfo.getConnectonTimeoutMsec())
+          .setReadTimeout(webhookInfo.getConnectonTimeoutMsec())
+          .setRequestTimeout(webhookInfo.getConnectonTimeoutMsec())
+          .setKeepAliveStrategy(new DefaultKeepAliveStrategy())
           .build();
-      ConnectionKeepAliveStrategy connectionStrategy = new DefaultConnectionKeepAliveStrategy();
-      CloseableHttpClient client =
-          HttpClientBuilder.create()
-              .setDefaultRequestConfig(config)
-              // being explicit about using default Keep-Alive strategy.
-              .setKeepAliveStrategy(connectionStrategy)
-              .build();
+      AsyncHttpClient httpClient = asyncHttpClient(config);
 
       bind(WebhookInfo.class).toInstance(webhookInfo);
-      bind(CloseableHttpClient.class).toInstance(client);
+      bind(AsyncHttpClient.class).toInstance(httpClient);
       PubsubEventModule.bindSubscriber(binder(), Webhook.class);
       bind(Webhook.class).in(Singleton.class);
+
+      SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+          .to(Webhook.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/resources/org/apache/aurora/scheduler/webhook.json
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/webhook.json b/src/main/resources/org/apache/aurora/scheduler/webhook.json
index b78c063..e645f64 100644
--- a/src/main/resources/org/apache/aurora/scheduler/webhook.json
+++ b/src/main/resources/org/apache/aurora/scheduler/webhook.json
@@ -3,6 +3,6 @@
     "Content-Type": "application/vnd.kafka.json.v1+json",
     "Producer-Type": "reliable"
   },
-  "targetURL": "http://localhost:5000/",
-  "timeoutMsec": 50
+  "targetURL": "http://localhost:8080/",
+  "timeoutMsec": 5000
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
index 07f39fa..827aa2d 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
@@ -17,259 +17,352 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
-import com.google.common.collect.ImmutableMap;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import com.google.common.collect.ImmutableMap;
 
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.WebhookInfo.WebhookInfoBuilder;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.http.Header;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
-import org.easymock.Capture;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.asynchttpclient.AsyncHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public class WebhookTest extends EasyMockTest {
 
+public class WebhookTest {
+  private static final String STATIC_URL = "http://localhost:8080/";
+  private static final Integer TIMEOUT = 5000;
+  private static final Map<String, String> HEADERS = ImmutableMap.of(
+      "Content-Type", "application/vnd.kafka.json.v1+json",
+      "Producer-Type", "reliable"
+  );
   private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", TaskTestUtil.JOB);
   private static final TaskStateChange CHANGE = TaskStateChange.initialized(TASK);
-  private static final TaskStateChange CHANGE_WITH_OLD_STATE = TaskStateChange
+  private static final TaskStateChange CHANGE_OLD_STATE = TaskStateChange
       .transition(TASK, ScheduleStatus.FAILED);
-  private static final String CHANGE_JSON = CHANGE_WITH_OLD_STATE.toJson();
+  private static final TaskStateChange CHANGE_LOST = TaskStateChange.transition(
+      TaskTestUtil.addStateTransition(TASK, ScheduleStatus.LOST, 1L), ScheduleStatus.LOST);
+  private static final String CHANGE_JSON = CHANGE_OLD_STATE.toJson();
+  private static final String CHANGE_LOST_JSON  = CHANGE_LOST.toJson();
+  // Below are test fixtures for WebhookInfoBuilders. Callers will need to specify the desired
+  // targetURL and build manually to get the desired WebhookInfo. We do this because we allocate
+  // an ephemeral port for our test Jetty server, meaning we cannot specify WebhookInfo statically.
   // Test fixture for WebhookInfo without a whitelist, thus all task statuses are implicitly
   // whitelisted.
-  private static final WebhookInfo WEBHOOK_INFO = WebhookInfo.newBuilder()
-      .setHeader("Content-Type", "application/vnd.kafka.json.v1+json")
-      .setHeader("Producer-Type", "reliable")
-      .setTargetURL("http://localhost:5000/")
-      .setTimeout(50)
-      .build();
+  private static final WebhookInfoBuilder WEBHOOK_INFO_BUILDER = WebhookInfo
+      .newBuilder()
+      .setHeaders(HEADERS)
+      .setTimeout(TIMEOUT);
   // Test fixture for WebhookInfo in which only "LOST" and "FAILED" task statuses are explicitly
   // whitelisted.
-  private static final WebhookInfo WEBHOOK_INFO_WITH_WHITELIST = WebhookInfo.newBuilder()
-      .setHeader("Content-Type", "application/vnd.kafka.json.v1+json")
-      .setHeader("Producer-Type", "reliable")
-      .setTargetURL("http://localhost:5000/")
-      .setTimeout(50)
+  private static final WebhookInfoBuilder WEBHOOK_INFO_WITH_WHITELIST_BUILDER = WebhookInfo
+      .newBuilder()
+      .setHeaders(HEADERS)
+      .setTimeout(TIMEOUT)
       .addWhitelistedStatus("LOST")
-      .addWhitelistedStatus("FAILED")
-      .build();
+      .addWhitelistedStatus("FAILED");
   // Test fixture for WebhookInfo in which all task statuses are whitelisted by wildcard character.
-  private static final WebhookInfo WEBHOOK_INFO_WITH_WILDCARD_WHITELIST = WebhookInfo.newBuilder()
-      .setHeader("Content-Type", "application/vnd.kafka.json.v1+json")
-      .setHeader("Producer-Type", "reliable")
-      .setTargetURL("http://localhost:5000/")
-      .setTimeout(50)
-      .addWhitelistedStatus("*")
-      .build();
+  private static final WebhookInfoBuilder WEBHOOK_INFO_WITH_WILDCARD_WHITELIST_BUILDER =
+      WebhookInfo
+          .newBuilder()
+          .setHeaders(HEADERS)
+          .setTimeout(TIMEOUT)
+          .addWhitelistedStatus("*");
+
+  private Server jettyServer;
+  private AsyncHttpClient httpClient;
+  private FakeStatsProvider statsProvider;
+
+  /**
+   * Wrap the DefaultAsyncHttpClient for testing so we can complete futures synchronously before
+   * validating assertions. Otherwise, we would have to call `Thread.sleep` in our tests after
+   * each TaskStateChange.
+   */
+  static class WebhookAsyncHttpClientWrapper extends DefaultAsyncHttpClient {
 
-  private CloseableHttpClient httpClient;
-  private Webhook webhook;
+    WebhookAsyncHttpClientWrapper(DefaultAsyncHttpClientConfig config) {
+      super(config);
+    }
+
+    @Override
+    public <T> ListenableFuture<T> executeRequest(org.asynchttpclient.Request request,
+                                                  AsyncHandler<T> handler) {
+      ListenableFuture<T> future = super.executeRequest(request, handler);
+      try {
+        future.get();
+        future.done();
+      } catch (InterruptedException | ExecutionException e) {
+        // Ignore, future should not fail to resolve.
+      }
+      return future;
+    }
+  }
 
   @Before
-  public void setUp() {
-    httpClient = createMock(CloseableHttpClient.class);
-    webhook = new Webhook(httpClient, WEBHOOK_INFO);
+  public void setUp() throws Exception {
+    DefaultAsyncHttpClientConfig testConfig = new DefaultAsyncHttpClientConfig.Builder()
+        .setConnectTimeout(TIMEOUT)
+        .setHandshakeTimeout(TIMEOUT)
+        .setSslSessionTimeout(TIMEOUT)
+        .setReadTimeout(TIMEOUT)
+        .setRequestTimeout(TIMEOUT)
+        .setKeepAliveStrategy(new DefaultKeepAliveStrategy())
+        .build();
+    httpClient = new WebhookAsyncHttpClientWrapper(testConfig);
+    statsProvider = new FakeStatsProvider();
+    jettyServer = new Server(0); // Start Jetty server with ephemeral port
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    jettyServer.stop();
   }
 
   @Test
   public void testTaskChangedStateNoOldState() throws Exception {
+    WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER);
+    Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
+
     // Should be a noop as oldState is MIA so this test would have throw an exception.
     // If it does not, then we are good.
-    control.replay();
-
     webhook.taskChangedState(CHANGE);
   }
 
   @Test
-  public void testTaskChangedWithOldState() throws Exception {
-    CloseableHttpResponse httpResponse = createMock(CloseableHttpResponse.class);
-    HttpEntity entity = createMock(HttpEntity.class);
-
-    Capture<HttpPost> httpPostCapture = createCapture();
-    expect(entity.isStreaming()).andReturn(false);
-    expect(httpResponse.getEntity()).andReturn(entity);
-    httpResponse.close();
-    expectLastCall().once();
-    expect(httpClient.execute(capture(httpPostCapture))).andReturn(httpResponse);
-
-    control.replay();
-
-    webhook.taskChangedState(CHANGE_WITH_OLD_STATE);
-
-    assertTrue(httpPostCapture.hasCaptured());
-    assertEquals(httpPostCapture.getValue().getURI(), new URI("http://localhost:5000/"));
-    assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()), CHANGE_JSON);
-    Header[] producerTypeHeader = httpPostCapture.getValue().getHeaders("Producer-Type");
-    assertEquals(producerTypeHeader.length, 1);
-    assertEquals(producerTypeHeader[0].getName(), "Producer-Type");
-    assertEquals(producerTypeHeader[0].getValue(), "reliable");
-    Header[] contentTypeHeader = httpPostCapture.getValue().getHeaders("Content-Type");
-    assertEquals(contentTypeHeader.length, 1);
-    assertEquals(contentTypeHeader[0].getName(), "Content-Type");
-    assertEquals(contentTypeHeader[0].getValue(), "application/vnd.kafka.json.v1+json");
-    assertNotNull(httpPostCapture.getValue().getHeaders("Timestamp"));
+  public void testTaskChangedWithOldStateSuccess() throws Exception {
+    jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_JSON));
+    jettyServer.start();
+    WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER);
+    Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
+
+    webhook.taskChangedState(CHANGE_OLD_STATE);
+
+    assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME));
+    assertEquals(1, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME));
+    assertEquals(0, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME));
+    assertEquals(0, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME));
   }
 
   @Test
-  public void testTaskChangeInWhiteList() throws Exception {
-    CloseableHttpResponse httpResponse = createMock(CloseableHttpResponse.class);
-    HttpEntity entity = createMock(HttpEntity.class);
+  public void testTaskChangedWithOldStateUserError() throws Exception {
+    // We expect CHANGE_JSON but get CHANGE_LOST which causes an error code to be returned.
+    jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_JSON));
+    jettyServer.start();
+    WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER);
+    Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
+
+    webhook.taskChangedState(CHANGE_LOST);
 
-    Capture<HttpPost> httpPostCapture = createCapture();
-    expect(entity.isStreaming()).andReturn(false);
-    expect(httpResponse.getEntity()).andReturn(entity);
-    httpResponse.close();
-    expectLastCall().once();
-    expect(httpClient.execute(capture(httpPostCapture))).andReturn(httpResponse);
+    assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME));
+    assertEquals(0, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME));
+    assertEquals(0, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME));
+    assertEquals(1, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME));
+  }
 
-    control.replay();
+  @Test
+  public void testTaskChangedWithOldStateError() throws Exception {
+    // We have a special handler here to cause a TimeoutException to trigger `onThrowable`
+    jettyServer.setHandler(new AbstractHandler() {
+      @Override
+      public void handle(String target, Request baseRequest, HttpServletRequest request,
+                         HttpServletResponse response) throws IOException, ServletException {
+        try {
+          Thread.sleep(TIMEOUT + 100);
+        } catch (InterruptedException e) {
+          // Should never get here.
+        }
+      }
+    });
+    jettyServer.start();
+    WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER);
+    Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
 
-    // Verifying TaskStateChange in the whitelist is sent to the configured endpoint.
-    Webhook webhookWithWhitelist = new Webhook(httpClient, WEBHOOK_INFO_WITH_WHITELIST);
-    TaskStateChange taskStateChangeInWhitelist = TaskStateChange
-        .transition(TaskTestUtil.addStateTransition(TASK, ScheduleStatus.LOST, 1L),
-            ScheduleStatus.RUNNING);
-    webhookWithWhitelist.taskChangedState(taskStateChangeInWhitelist);
-
-    assertTrue(httpPostCapture.hasCaptured());
-    assertEquals(httpPostCapture.getValue().getURI(), new URI("http://localhost:5000/"));
-    assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()),
-        taskStateChangeInWhitelist.toJson());
-    Header[] producerTypeHeader = httpPostCapture.getValue().getHeaders("Producer-Type");
-    assertEquals(producerTypeHeader.length, 1);
-    assertEquals(producerTypeHeader[0].getName(), "Producer-Type");
-    assertEquals(producerTypeHeader[0].getValue(), "reliable");
-    Header[] contentTypeHeader = httpPostCapture.getValue().getHeaders("Content-Type");
-    assertEquals(contentTypeHeader.length, 1);
-    assertEquals(contentTypeHeader[0].getName(), "Content-Type");
-    assertEquals(contentTypeHeader[0].getValue(), "application/vnd.kafka.json.v1+json");
-    assertNotNull(httpPostCapture.getValue().getHeaders("Timestamp"));
+    webhook.taskChangedState(CHANGE_OLD_STATE);
+
+    assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME));
+    assertEquals(0, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME));
+    assertEquals(1, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME));
+    assertEquals(0, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME));
   }
 
   @Test
-  public void testTaskChangeNotInWhiteList() throws Exception {
-    control.replay();
+  public void testTaskChangeInWhiteList() throws Exception {
+    // Verifying TaskStateChange in the whitelist is sent to the configured endpoint.
+    jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_LOST_JSON));
+    jettyServer.start();
+    WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_WITH_WHITELIST_BUILDER);
+    Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
 
-    // Verifying TaskStateChange not in the whitelist is not sent to the configured endpoint.
-    Webhook webhookWithWhitelist = new Webhook(httpClient, WEBHOOK_INFO_WITH_WHITELIST);
-    webhookWithWhitelist.taskChangedState(CHANGE_WITH_OLD_STATE);
+    webhook.taskChangedState(CHANGE_LOST);
+
+    assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME));
+    assertEquals(1, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME));
+    assertEquals(0, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME));
+    assertEquals(0, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME));
   }
 
   @Test
-  public void testCatchHttpClientException() throws Exception {
-    // IOException should be silenced.
-    Capture<HttpPost> httpPostCapture = createCapture();
-    expect(httpClient.execute(capture(httpPostCapture)))
-        .andThrow(new IOException());
-    control.replay();
-
-    webhook.taskChangedState(CHANGE_WITH_OLD_STATE);
+  public void testTaskChangeNotInWhiteList() throws Exception {
+    // Verifying TaskStateChange not in the whitelist is not sent to the configured endpoint.
+    jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_JSON));
+    jettyServer.start();
+    WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_WITH_WHITELIST_BUILDER);
+    Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
+
+    webhook.taskChangedState(CHANGE_OLD_STATE);
   }
 
   @Test
   public void testParsingWebhookInfo() throws Exception {
+    WebhookInfo webhookInfo = WEBHOOK_INFO_BUILDER
+        .setTargetURL(STATIC_URL)
+        .build();
+
     WebhookInfo parsedWebhookInfo = WebhookModule.parseWebhookConfig(
         WebhookModule.readWebhookFile());
     // Verifying the WebhookInfo parsed from webhook.json file is identical to the WebhookInfo
     // built from WebhookInfoBuilder.
-    assertEquals(parsedWebhookInfo.toString(), WEBHOOK_INFO.toString());
+    assertEquals(parsedWebhookInfo.toString(), webhookInfo.toString());
     // Verifying all attributes were parsed correctly.
-    assertEquals(parsedWebhookInfo.getHeaders(), WEBHOOK_INFO.getHeaders());
-    assertEquals(parsedWebhookInfo.getTargetURI(), WEBHOOK_INFO.getTargetURI());
+    assertEquals(parsedWebhookInfo.getHeaders(), webhookInfo.getHeaders());
+    assertEquals(parsedWebhookInfo.getTargetURI(), webhookInfo.getTargetURI());
     assertEquals(parsedWebhookInfo.getConnectonTimeoutMsec(),
-        WEBHOOK_INFO.getConnectonTimeoutMsec());
-    assertEquals(parsedWebhookInfo.getWhitelistedStatuses(), WEBHOOK_INFO.getWhitelistedStatuses());
-    control.replay();
+        webhookInfo.getConnectonTimeoutMsec());
+    assertEquals(parsedWebhookInfo.getWhitelistedStatuses(), webhookInfo.getWhitelistedStatuses());
   }
 
   @Test
   public void testWebhookInfo() throws Exception {
-    assertEquals(WEBHOOK_INFO.toString(),
+    WebhookInfo webhookInfo = WEBHOOK_INFO_BUILDER
+        .setTargetURL(STATIC_URL)
+        .build();
+
+    assertEquals(webhookInfo.toString(),
         "WebhookInfo{headers={"
             + "Content-Type=application/vnd.kafka.json.v1+json, "
             + "Producer-Type=reliable"
             + "}, "
-            + "targetURI=http://localhost:5000/, "
-            + "connectTimeoutMsec=50, "
+            + "targetURI=http://localhost:8080/, "
+            + "connectTimeoutMsec=5000, "
             + "whitelistedStatuses=null"
             + "}");
     // Verifying all attributes were set correctly.
-    Map<String, String> headers = ImmutableMap.of(
-        "Content-Type", "application/vnd.kafka.json.v1+json",
-        "Producer-Type", "reliable");
-    assertEquals(WEBHOOK_INFO.getHeaders(), headers);
-    URI targetURI = new URI("http://localhost:5000/");
-    assertEquals(WEBHOOK_INFO.getTargetURI(), targetURI);
-    Integer timeoutMsec = 50;
-    assertEquals(WEBHOOK_INFO.getConnectonTimeoutMsec(), timeoutMsec);
-    assertFalse(WEBHOOK_INFO.getWhitelistedStatuses().isPresent());
-    control.replay();
+    assertEquals(webhookInfo.getHeaders(), HEADERS);
+    assertEquals(webhookInfo.getTargetURI(), URI.create(STATIC_URL));
+    assertEquals(webhookInfo.getConnectonTimeoutMsec(), TIMEOUT);
+    assertFalse(webhookInfo.getWhitelistedStatuses().isPresent());
   }
 
   @Test
   public void testWebhookInfoWithWhiteList() throws Exception {
-    assertEquals(WEBHOOK_INFO_WITH_WHITELIST.toString(),
+    WebhookInfo webhookInfoWithWhitelist = WEBHOOK_INFO_WITH_WHITELIST_BUILDER
+        .setTargetURL(STATIC_URL)
+        .build();
+
+    assertEquals(webhookInfoWithWhitelist.toString(),
         "WebhookInfo{headers={"
             + "Content-Type=application/vnd.kafka.json.v1+json, "
             + "Producer-Type=reliable"
             + "}, "
-            + "targetURI=http://localhost:5000/, "
-            + "connectTimeoutMsec=50, "
+            + "targetURI=http://localhost:8080/, "
+            + "connectTimeoutMsec=5000, "
             + "whitelistedStatuses=[LOST, FAILED]"
             + "}");
     // Verifying all attributes were set correctly.
-    Map<String, String> headers = ImmutableMap.of(
-        "Content-Type", "application/vnd.kafka.json.v1+json",
-        "Producer-Type", "reliable");
-    assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getHeaders(), headers);
-    URI targetURI = new URI("http://localhost:5000/");
-    assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getTargetURI(), targetURI);
-    Integer timeoutMsec = 50;
-    assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getConnectonTimeoutMsec(), timeoutMsec);
-    List<ScheduleStatus> statuses = WEBHOOK_INFO_WITH_WHITELIST.getWhitelistedStatuses().get();
+    assertEquals(webhookInfoWithWhitelist.getHeaders(), HEADERS);
+    assertEquals(webhookInfoWithWhitelist.getTargetURI(), URI.create(STATIC_URL));
+    assertEquals(webhookInfoWithWhitelist.getConnectonTimeoutMsec(), TIMEOUT);
+    List<ScheduleStatus> statuses = webhookInfoWithWhitelist.getWhitelistedStatuses().get();
     assertEquals(statuses.size(), 2);
     assertEquals(statuses.get(0), ScheduleStatus.LOST);
     assertEquals(statuses.get(1), ScheduleStatus.FAILED);
-    control.replay();
   }
 
   @Test
   public void testWebhookInfoWithWildcardWhitelist() throws Exception {
-    assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.toString(),
+    WebhookInfo webhookInfoWithWildcardWhitelist = WEBHOOK_INFO_WITH_WILDCARD_WHITELIST_BUILDER
+        .setTargetURL(STATIC_URL)
+        .build();
+
+    assertEquals(webhookInfoWithWildcardWhitelist.toString(),
         "WebhookInfo{headers={"
             + "Content-Type=application/vnd.kafka.json.v1+json, "
             + "Producer-Type=reliable"
             + "}, "
-            + "targetURI=http://localhost:5000/, "
-            + "connectTimeoutMsec=50, "
+            + "targetURI=http://localhost:8080/, "
+            + "connectTimeoutMsec=5000, "
             + "whitelistedStatuses=null"
             + "}");
     // Verifying all attributes were set correctly.
-    Map<String, String> headers = ImmutableMap.of(
-        "Content-Type", "application/vnd.kafka.json.v1+json",
-        "Producer-Type", "reliable");
-    assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getHeaders(), headers);
-    URI targetURI = new URI("http://localhost:5000/");
-    assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getTargetURI(), targetURI);
-    Integer timeoutMsec = 50;
-    assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getConnectonTimeoutMsec(), timeoutMsec);
-    assertFalse(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getWhitelistedStatuses().isPresent());
-    control.replay();
+    assertEquals(webhookInfoWithWildcardWhitelist.getHeaders(), HEADERS);
+    assertEquals(webhookInfoWithWildcardWhitelist.getTargetURI(), URI.create(STATIC_URL));
+    assertEquals(webhookInfoWithWildcardWhitelist.getConnectonTimeoutMsec(), TIMEOUT);
+    assertFalse(webhookInfoWithWildcardWhitelist.getWhitelistedStatuses().isPresent());
+  }
+
+  /** Create a Jetty handler that expects a request with a given content body. */
+  private AbstractHandler createHandlerThatExpectsContent(String expected) {
+    return new AbstractHandler() {
+      @Override
+      public void handle(String target, Request baseRequest, HttpServletRequest request,
+                         HttpServletResponse response) throws IOException, ServletException {
+        String body = request.getReader().lines().collect(Collectors.joining());
+        if (validateRequest(request) && body.equals(expected)) {
+          response.setStatus(HttpServletResponse.SC_OK);
+        } else {
+          response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+        }
+        baseRequest.setHandled(true);
+      }
+    };
+  }
+
+  /** Validate that the request is what we are expecting to send out (ex. POST, headers). */
+  private boolean validateRequest(HttpServletRequest request) {
+    // Validate general fields are what we expect (POST, headers).
+    if (!request.getMethod().equals("POST")) {
+      return false;
+    }
+
+    for (Map.Entry<String, String> header : HEADERS.entrySet()) {
+      String expectedKey = header.getKey();
+      String expectedValue = header.getValue();
+
+      if (!expectedValue.equals(request.getHeader(expectedKey))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Need this method to build `WebhookInfo` for testing with the running Jetty port. `jettyServer`
+   * should have been started before this method is called.
+   */
+  private WebhookInfo buildWebhookInfoWithJettyPort(WebhookInfoBuilder builder) {
+    String fullUrl = String.format("http://localhost:%d", jettyServer.getURI().getPort());
+    return builder
+        .setTargetURL(fullUrl)
+        .build();
   }
 }