You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by sa...@apache.org on 2017/10/04 20:07:30 UTC
aurora git commit: Convert Webhook to AbstractIdleService,
use async HTTP client
Repository: aurora
Updated Branches:
refs/heads/master 4e7cdc422 -> 0f1e68401
Convert Webhook to AbstractIdleService, use async HTTP client
Hijacking https://reviews.apache.org/r/59703
>From the above review: "Current code uses a synchronous HTTP client, which can block the EventBus. Switch to an async HTTP client."
Previously, we had an issue where the HTTP client would have a non-daemon thread which caused the Scheduler to fail to shutdown. I converted it into an AbstractIdleService and properly closed the client in the shutdown() method. Additionally, I made a small tweak to the original code where we ABORT any response receieved after the status since we don't care. We just use the response code for stats.
Testing Done:
./gradlew test
Tested proper shutdown occurs in Vagrant.
Scale tested up to 2000 TASK_LOST events with the registered endpoint waiting 5-10 minutes to response -- does not seem to block scheduling.
Bugs closed: AURORA-1773
Reviewed at https://reviews.apache.org/r/62700/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0f1e6840
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0f1e6840
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0f1e6840
Branch: refs/heads/master
Commit: 0f1e68401da64a9aa2fd507010d21b7d7d8ebf53
Parents: 4e7cdc4
Author: Jordan Ly <jo...@gmail.com>
Authored: Wed Oct 4 13:07:19 2017 -0700
Committer: Santhosh Kumar <ss...@twitter.com>
Committed: Wed Oct 4 13:07:19 2017 -0700
----------------------------------------------------------------------
build.gradle | 10 +-
.../apache/aurora/scheduler/events/Webhook.java | 117 ++++--
.../aurora/scheduler/events/WebhookInfo.java | 14 +
.../aurora/scheduler/events/WebhookModule.java | 36 +-
.../org/apache/aurora/scheduler/webhook.json | 4 +-
.../aurora/scheduler/events/WebhookTest.java | 413 ++++++++++++-------
6 files changed, 371 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 069c62e..d7a4287 100644
--- a/build.gradle
+++ b/build.gradle
@@ -82,14 +82,14 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
ext.gsonRev = '2.3.1'
ext.guavaRev = '20.0'
ext.guiceRev = '3.0'
- ext.httpclientRev = '4.5.2'
- ext.httpcoreRev = '4.4.4'
+ ext.asyncHttpclientRev = '2.0.37'
ext.jacksonRev = '2.5.1'
ext.jerseyRev = '1.19'
ext.jsrRev = '3.0.1'
ext.junitRev = '4.12'
ext.logbackRev = '1.2.3'
ext.mybatisRev = '3.4.1'
+ ext.nettyRev = '4.0.52.Final'
ext.protobufRev = '2.6.1'
ext.servletRev = '3.1.0'
ext.slf4jRev = '1.7.25'
@@ -110,13 +110,12 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
// See http://forums.gradle.org/gradle/topics/shouldnt-resolutionstrategy-affect-depending-projects-transitive-dependencies
resolutionStrategy {
failOnVersionConflict()
- force "org.apache.httpcomponents:httpclient:${httpclientRev}"
- force "org.apache.httpcomponents:httpcore:${httpcoreRev}"
force "com.fasterxml.jackson.core:jackson-annotations:${jacksonRev}"
force "com.fasterxml.jackson.core:jackson-core:${jacksonRev}"
force "com.google.code.gson:gson:${gsonRev}"
force "com.google.guava:guava:${guavaRev}"
force "com.google.protobuf:protobuf-java:${protobufRev}"
+ force "io.netty:netty-handler:${nettyRev}"
force "junit:junit:${junitRev}"
force "org.apache.thrift:libthrift:${thriftRev}"
force "org.apache.zookeeper:zookeeper:${zookeeperRev}"
@@ -415,8 +414,7 @@ dependencies {
compile "org.apache.curator:curator-framework:${curatorRev}"
compile "org.apache.curator:curator-recipes:${curatorRev}"
compile 'org.apache.mesos:mesos:1.2.0'
- compile "org.apache.httpcomponents:httpclient:${httpclientRev}"
- compile "org.apache.httpcomponents:httpcore:${httpcoreRev}"
+ compile "org.asynchttpclient:async-http-client:${asyncHttpclientRev}"
compile "org.apache.shiro:shiro-guice:${shiroRev}"
compile "org.apache.shiro:shiro-web:${shiroRev}"
compile "org.apache.zookeeper:zookeeper:${zookeeperRev}"
http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
index 05f46a1..2af8118 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
@@ -13,59 +13,73 @@
*/
package org.apache.aurora.scheduler.events;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
+import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
+import org.asynchttpclient.AsyncCompletionHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.HttpResponseStatus;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.util.HttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.Objects.requireNonNull;
+
/**
* Watches TaskStateChanges and send events to configured endpoint.
*/
-public class Webhook implements EventSubscriber {
+public class Webhook extends AbstractIdleService implements EventSubscriber {
+ @VisibleForTesting
+ static final String ATTEMPTS_STAT_NAME = "webhooks_attempts";
+ @VisibleForTesting
+ static final String SUCCESS_STAT_NAME = "webhooks_success";
+ @VisibleForTesting
+ static final String ERRORS_STAT_NAME = "webhooks_errors";
+ @VisibleForTesting
+ static final String USER_ERRORS_STAT_NAME = "webhooks_user_errors";
private static final Logger LOG = LoggerFactory.getLogger(Webhook.class);
private final WebhookInfo webhookInfo;
- private final CloseableHttpClient httpClient;
+ private final AsyncHttpClient httpClient;
private final Predicate<ScheduleStatus> isWhitelisted;
+ private final AtomicLong attemptsCounter;
+ private final AtomicLong successCounter;
+ private final AtomicLong errorsCounter;
+ private final AtomicLong userErrorsCounter;
+
@Inject
- Webhook(CloseableHttpClient httpClient, WebhookInfo webhookInfo) {
- this.webhookInfo = webhookInfo;
- this.httpClient = httpClient;
- // A task status is whitelisted if: a) the whitelist is absent, or b) the task status is
- // explicitly specified in the whitelist.
+ Webhook(AsyncHttpClient httpClient, WebhookInfo webhookInfo, StatsProvider statsProvider) {
+ this.webhookInfo = requireNonNull(webhookInfo);
+ this.httpClient = requireNonNull(httpClient);
+ this.attemptsCounter = statsProvider.makeCounter(ATTEMPTS_STAT_NAME);
+ this.successCounter = statsProvider.makeCounter(SUCCESS_STAT_NAME);
+ this.errorsCounter = statsProvider.makeCounter(ERRORS_STAT_NAME);
+ this.userErrorsCounter = statsProvider.makeCounter(USER_ERRORS_STAT_NAME);
this.isWhitelisted = status -> !webhookInfo.getWhitelistedStatuses().isPresent()
|| webhookInfo.getWhitelistedStatuses().get().contains(status);
LOG.info("Webhook enabled with info" + this.webhookInfo);
}
- private HttpPost createPostRequest(TaskStateChange stateChange)
- throws UnsupportedEncodingException {
- String eventJson = stateChange.toJson();
- HttpPost post = new HttpPost();
- post.setURI(webhookInfo.getTargetURI());
- post.setHeader("Timestamp", Long.toString(Instant.now().toEpochMilli()));
- post.setEntity(new StringEntity(eventJson));
- webhookInfo.getHeaders().entrySet().forEach(
- e -> post.setHeader(e.getKey(), e.getValue()));
- return post;
+ private BoundRequestBuilder createRequest(TaskStateChange stateChange) {
+ return httpClient.preparePost(webhookInfo.getTargetURI().toString())
+ .setBody(stateChange.toJson())
+ .setSingleHeaders(webhookInfo.getHeaders())
+ .addHeader("Timestamp", Long.toString(Instant.now().toEpochMilli()));
}
/**
@@ -82,20 +96,49 @@ public class Webhook implements EventSubscriber {
// resend the entire state. This check also ensures that only whitelisted statuses will be sent
// to the configured endpoint.
if (stateChange.getOldState().isPresent() && isWhitelisted.apply(stateChange.getNewState())) {
+ attemptsCounter.incrementAndGet();
try {
- HttpPost post = createPostRequest(stateChange);
- // Using try-with-resources on closeable and following
- // https://hc.apache.org/httpcomponents-client-4.5.x/quickstart.html to make sure stream is
- // closed after we get back a response to not leak http connections.
- try (CloseableHttpResponse httpResponse = httpClient.execute(post)) {
- HttpEntity entity = httpResponse.getEntity();
- EntityUtils.consumeQuietly(entity);
- } catch (IOException exp) {
- LOG.error("Error sending a Webhook event", exp);
- }
- } catch (UnsupportedEncodingException exp) {
- LOG.error("HttpPost exception when creating an HTTP Post request", exp);
+ // We don't care about the response body, so only listen for the HTTP status code.
+ createRequest(stateChange).execute(new AsyncCompletionHandler<Integer>() {
+ @Override
+ public void onThrowable(Throwable t) {
+ errorsCounter.incrementAndGet();
+ LOG.error("Error sending a Webhook event", t);
+ }
+
+ @Override
+ public State onStatusReceived(HttpResponseStatus status) throws Exception {
+ if (status.getStatusCode() == HttpConstants.ResponseStatusCodes.OK_200) {
+ successCounter.incrementAndGet();
+ } else {
+ userErrorsCounter.incrementAndGet();
+ }
+
+ // Abort after we get the status because that is all we use for processing.
+ return State.ABORT;
+ }
+
+ @Override
+ public Integer onCompleted(Response response) throws Exception {
+ // We do not care about the full response.
+ return 0;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Error making Webhook request", e);
+ errorsCounter.incrementAndGet();
}
}
}
+
+ @Override
+ protected void startUp() throws Exception {
+ // No-op
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOG.info("Shutting down async Webhook client.");
+ httpClient.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
index da22c21..44789d6 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
@@ -129,6 +129,20 @@ public class WebhookInfo {
return this;
}
+ /**
+ * This method will add the supplied headers to the current headers.
+ *
+ * @param values The headers to add.
+ * @return The modified builder.
+ */
+ public WebhookInfoBuilder setHeaders(Map<String, String> values) {
+ for (Map.Entry<String, String> entry : values.entrySet()) {
+ setHeader(entry.getKey(), entry.getValue());
+ }
+
+ return this;
+ }
+
public WebhookInfoBuilder setTargetURL(String targetURL) {
this.targetURL = targetURL;
return this;
http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java b/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
index 1f10af7..8c9ea05 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/WebhookModule.java
@@ -29,17 +29,18 @@ import org.apache.aurora.common.args.Arg;
import org.apache.aurora.common.args.CmdLine;
import org.apache.aurora.common.args.constraints.CanRead;
import org.apache.aurora.common.args.constraints.Exists;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.conn.ConnectionKeepAliveStrategy;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
-import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.asynchttpclient.Dsl.asyncHttpClient;
+
/**
* Binding module for webhook management.
*/
@@ -70,24 +71,23 @@ public class WebhookModule extends AbstractModule {
protected void configure() {
if (enableWebhook) {
WebhookInfo webhookInfo = parseWebhookConfig(readWebhookFile());
- int timeout = webhookInfo.getConnectonTimeoutMsec();
- RequestConfig config = RequestConfig.custom()
- .setConnectTimeout(timeout) // establish connection with server eg time to TCP handshake.
- .setConnectionRequestTimeout(timeout) // get a connection from internal pool.
- .setSocketTimeout(timeout) // wait for data after connection was established.
+ DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder()
+ .setConnectTimeout(webhookInfo.getConnectonTimeoutMsec())
+ .setHandshakeTimeout(webhookInfo.getConnectonTimeoutMsec())
+ .setSslSessionTimeout(webhookInfo.getConnectonTimeoutMsec())
+ .setReadTimeout(webhookInfo.getConnectonTimeoutMsec())
+ .setRequestTimeout(webhookInfo.getConnectonTimeoutMsec())
+ .setKeepAliveStrategy(new DefaultKeepAliveStrategy())
.build();
- ConnectionKeepAliveStrategy connectionStrategy = new DefaultConnectionKeepAliveStrategy();
- CloseableHttpClient client =
- HttpClientBuilder.create()
- .setDefaultRequestConfig(config)
- // being explicit about using default Keep-Alive strategy.
- .setKeepAliveStrategy(connectionStrategy)
- .build();
+ AsyncHttpClient httpClient = asyncHttpClient(config);
bind(WebhookInfo.class).toInstance(webhookInfo);
- bind(CloseableHttpClient.class).toInstance(client);
+ bind(AsyncHttpClient.class).toInstance(httpClient);
PubsubEventModule.bindSubscriber(binder(), Webhook.class);
bind(Webhook.class).in(Singleton.class);
+
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+ .to(Webhook.class);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/main/resources/org/apache/aurora/scheduler/webhook.json
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/webhook.json b/src/main/resources/org/apache/aurora/scheduler/webhook.json
index b78c063..e645f64 100644
--- a/src/main/resources/org/apache/aurora/scheduler/webhook.json
+++ b/src/main/resources/org/apache/aurora/scheduler/webhook.json
@@ -3,6 +3,6 @@
"Content-Type": "application/vnd.kafka.json.v1+json",
"Producer-Type": "reliable"
},
- "targetURL": "http://localhost:5000/",
- "timeoutMsec": 50
+ "targetURL": "http://localhost:8080/",
+ "timeoutMsec": 5000
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/0f1e6840/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
index 07f39fa..827aa2d 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
@@ -17,259 +17,352 @@ import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
-import com.google.common.collect.ImmutableMap;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import com.google.common.collect.ImmutableMap;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.WebhookInfo.WebhookInfoBuilder;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.http.Header;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
-import org.easymock.Capture;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.asynchttpclient.AsyncHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public class WebhookTest extends EasyMockTest {
+public class WebhookTest {
+ private static final String STATIC_URL = "http://localhost:8080/";
+ private static final Integer TIMEOUT = 5000;
+ private static final Map<String, String> HEADERS = ImmutableMap.of(
+ "Content-Type", "application/vnd.kafka.json.v1+json",
+ "Producer-Type", "reliable"
+ );
private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", TaskTestUtil.JOB);
private static final TaskStateChange CHANGE = TaskStateChange.initialized(TASK);
- private static final TaskStateChange CHANGE_WITH_OLD_STATE = TaskStateChange
+ private static final TaskStateChange CHANGE_OLD_STATE = TaskStateChange
.transition(TASK, ScheduleStatus.FAILED);
- private static final String CHANGE_JSON = CHANGE_WITH_OLD_STATE.toJson();
+ private static final TaskStateChange CHANGE_LOST = TaskStateChange.transition(
+ TaskTestUtil.addStateTransition(TASK, ScheduleStatus.LOST, 1L), ScheduleStatus.LOST);
+ private static final String CHANGE_JSON = CHANGE_OLD_STATE.toJson();
+ private static final String CHANGE_LOST_JSON = CHANGE_LOST.toJson();
+ // Below are test fixtures for WebhookInfoBuilders. Callers will need to specify the desired
+ // targetURL and build manually to get the desired WebhookInfo. We do this because we allocate
+ // an ephemeral port for our test Jetty server, meaning we cannot specify WebhookInfo statically.
// Test fixture for WebhookInfo without a whitelist, thus all task statuses are implicitly
// whitelisted.
- private static final WebhookInfo WEBHOOK_INFO = WebhookInfo.newBuilder()
- .setHeader("Content-Type", "application/vnd.kafka.json.v1+json")
- .setHeader("Producer-Type", "reliable")
- .setTargetURL("http://localhost:5000/")
- .setTimeout(50)
- .build();
+ private static final WebhookInfoBuilder WEBHOOK_INFO_BUILDER = WebhookInfo
+ .newBuilder()
+ .setHeaders(HEADERS)
+ .setTimeout(TIMEOUT);
// Test fixture for WebhookInfo in which only "LOST" and "FAILED" task statuses are explicitly
// whitelisted.
- private static final WebhookInfo WEBHOOK_INFO_WITH_WHITELIST = WebhookInfo.newBuilder()
- .setHeader("Content-Type", "application/vnd.kafka.json.v1+json")
- .setHeader("Producer-Type", "reliable")
- .setTargetURL("http://localhost:5000/")
- .setTimeout(50)
+ private static final WebhookInfoBuilder WEBHOOK_INFO_WITH_WHITELIST_BUILDER = WebhookInfo
+ .newBuilder()
+ .setHeaders(HEADERS)
+ .setTimeout(TIMEOUT)
.addWhitelistedStatus("LOST")
- .addWhitelistedStatus("FAILED")
- .build();
+ .addWhitelistedStatus("FAILED");
// Test fixture for WebhookInfo in which all task statuses are whitelisted by wildcard character.
- private static final WebhookInfo WEBHOOK_INFO_WITH_WILDCARD_WHITELIST = WebhookInfo.newBuilder()
- .setHeader("Content-Type", "application/vnd.kafka.json.v1+json")
- .setHeader("Producer-Type", "reliable")
- .setTargetURL("http://localhost:5000/")
- .setTimeout(50)
- .addWhitelistedStatus("*")
- .build();
+ private static final WebhookInfoBuilder WEBHOOK_INFO_WITH_WILDCARD_WHITELIST_BUILDER =
+ WebhookInfo
+ .newBuilder()
+ .setHeaders(HEADERS)
+ .setTimeout(TIMEOUT)
+ .addWhitelistedStatus("*");
+
+ private Server jettyServer;
+ private AsyncHttpClient httpClient;
+ private FakeStatsProvider statsProvider;
+
+ /**
+ * Wrap the DefaultAsyncHttpClient for testing so we can complete futures synchronously before
+ * validating assertions. Otherwise, we would have to call `Thread.sleep` in our tests after
+ * each TaskStateChange.
+ */
+ static class WebhookAsyncHttpClientWrapper extends DefaultAsyncHttpClient {
- private CloseableHttpClient httpClient;
- private Webhook webhook;
+ WebhookAsyncHttpClientWrapper(DefaultAsyncHttpClientConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> executeRequest(org.asynchttpclient.Request request,
+ AsyncHandler<T> handler) {
+ ListenableFuture<T> future = super.executeRequest(request, handler);
+ try {
+ future.get();
+ future.done();
+ } catch (InterruptedException | ExecutionException e) {
+ // Ignore, future should not fail to resolve.
+ }
+ return future;
+ }
+ }
@Before
- public void setUp() {
- httpClient = createMock(CloseableHttpClient.class);
- webhook = new Webhook(httpClient, WEBHOOK_INFO);
+ public void setUp() throws Exception {
+ DefaultAsyncHttpClientConfig testConfig = new DefaultAsyncHttpClientConfig.Builder()
+ .setConnectTimeout(TIMEOUT)
+ .setHandshakeTimeout(TIMEOUT)
+ .setSslSessionTimeout(TIMEOUT)
+ .setReadTimeout(TIMEOUT)
+ .setRequestTimeout(TIMEOUT)
+ .setKeepAliveStrategy(new DefaultKeepAliveStrategy())
+ .build();
+ httpClient = new WebhookAsyncHttpClientWrapper(testConfig);
+ statsProvider = new FakeStatsProvider();
+ jettyServer = new Server(0); // Start Jetty server with ephemeral port
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ jettyServer.stop();
}
@Test
public void testTaskChangedStateNoOldState() throws Exception {
+ WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER);
+ Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
+
// Should be a noop as oldState is MIA so this test would have throw an exception.
// If it does not, then we are good.
- control.replay();
-
webhook.taskChangedState(CHANGE);
}
@Test
- public void testTaskChangedWithOldState() throws Exception {
- CloseableHttpResponse httpResponse = createMock(CloseableHttpResponse.class);
- HttpEntity entity = createMock(HttpEntity.class);
-
- Capture<HttpPost> httpPostCapture = createCapture();
- expect(entity.isStreaming()).andReturn(false);
- expect(httpResponse.getEntity()).andReturn(entity);
- httpResponse.close();
- expectLastCall().once();
- expect(httpClient.execute(capture(httpPostCapture))).andReturn(httpResponse);
-
- control.replay();
-
- webhook.taskChangedState(CHANGE_WITH_OLD_STATE);
-
- assertTrue(httpPostCapture.hasCaptured());
- assertEquals(httpPostCapture.getValue().getURI(), new URI("http://localhost:5000/"));
- assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()), CHANGE_JSON);
- Header[] producerTypeHeader = httpPostCapture.getValue().getHeaders("Producer-Type");
- assertEquals(producerTypeHeader.length, 1);
- assertEquals(producerTypeHeader[0].getName(), "Producer-Type");
- assertEquals(producerTypeHeader[0].getValue(), "reliable");
- Header[] contentTypeHeader = httpPostCapture.getValue().getHeaders("Content-Type");
- assertEquals(contentTypeHeader.length, 1);
- assertEquals(contentTypeHeader[0].getName(), "Content-Type");
- assertEquals(contentTypeHeader[0].getValue(), "application/vnd.kafka.json.v1+json");
- assertNotNull(httpPostCapture.getValue().getHeaders("Timestamp"));
+ public void testTaskChangedWithOldStateSuccess() throws Exception {
+ jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_JSON));
+ jettyServer.start();
+ WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER);
+ Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
+
+ webhook.taskChangedState(CHANGE_OLD_STATE);
+
+ assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME));
+ assertEquals(1, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME));
+ assertEquals(0, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME));
+ assertEquals(0, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME));
}
@Test
- public void testTaskChangeInWhiteList() throws Exception {
- CloseableHttpResponse httpResponse = createMock(CloseableHttpResponse.class);
- HttpEntity entity = createMock(HttpEntity.class);
+ public void testTaskChangedWithOldStateUserError() throws Exception {
+ // We expect CHANGE_JSON but get CHANGE_LOST which causes an error code to be returned.
+ jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_JSON));
+ jettyServer.start();
+ WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER);
+ Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
+
+ webhook.taskChangedState(CHANGE_LOST);
- Capture<HttpPost> httpPostCapture = createCapture();
- expect(entity.isStreaming()).andReturn(false);
- expect(httpResponse.getEntity()).andReturn(entity);
- httpResponse.close();
- expectLastCall().once();
- expect(httpClient.execute(capture(httpPostCapture))).andReturn(httpResponse);
+ assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME));
+ assertEquals(0, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME));
+ assertEquals(0, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME));
+ assertEquals(1, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME));
+ }
- control.replay();
+ @Test
+ public void testTaskChangedWithOldStateError() throws Exception {
+ // We have a special handler here to cause a TimeoutException to trigger `onThrowable`
+ jettyServer.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ try {
+ Thread.sleep(TIMEOUT + 100);
+ } catch (InterruptedException e) {
+ // Should never get here.
+ }
+ }
+ });
+ jettyServer.start();
+ WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_BUILDER);
+ Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
- // Verifying TaskStateChange in the whitelist is sent to the configured endpoint.
- Webhook webhookWithWhitelist = new Webhook(httpClient, WEBHOOK_INFO_WITH_WHITELIST);
- TaskStateChange taskStateChangeInWhitelist = TaskStateChange
- .transition(TaskTestUtil.addStateTransition(TASK, ScheduleStatus.LOST, 1L),
- ScheduleStatus.RUNNING);
- webhookWithWhitelist.taskChangedState(taskStateChangeInWhitelist);
-
- assertTrue(httpPostCapture.hasCaptured());
- assertEquals(httpPostCapture.getValue().getURI(), new URI("http://localhost:5000/"));
- assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()),
- taskStateChangeInWhitelist.toJson());
- Header[] producerTypeHeader = httpPostCapture.getValue().getHeaders("Producer-Type");
- assertEquals(producerTypeHeader.length, 1);
- assertEquals(producerTypeHeader[0].getName(), "Producer-Type");
- assertEquals(producerTypeHeader[0].getValue(), "reliable");
- Header[] contentTypeHeader = httpPostCapture.getValue().getHeaders("Content-Type");
- assertEquals(contentTypeHeader.length, 1);
- assertEquals(contentTypeHeader[0].getName(), "Content-Type");
- assertEquals(contentTypeHeader[0].getValue(), "application/vnd.kafka.json.v1+json");
- assertNotNull(httpPostCapture.getValue().getHeaders("Timestamp"));
+ webhook.taskChangedState(CHANGE_OLD_STATE);
+
+ assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME));
+ assertEquals(0, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME));
+ assertEquals(1, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME));
+ assertEquals(0, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME));
}
@Test
- public void testTaskChangeNotInWhiteList() throws Exception {
- control.replay();
+ public void testTaskChangeInWhiteList() throws Exception {
+ // Verifying TaskStateChange in the whitelist is sent to the configured endpoint.
+ jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_LOST_JSON));
+ jettyServer.start();
+ WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_WITH_WHITELIST_BUILDER);
+ Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
- // Verifying TaskStateChange not in the whitelist is not sent to the configured endpoint.
- Webhook webhookWithWhitelist = new Webhook(httpClient, WEBHOOK_INFO_WITH_WHITELIST);
- webhookWithWhitelist.taskChangedState(CHANGE_WITH_OLD_STATE);
+ webhook.taskChangedState(CHANGE_LOST);
+
+ assertEquals(1, statsProvider.getLongValue(Webhook.ATTEMPTS_STAT_NAME));
+ assertEquals(1, statsProvider.getLongValue(Webhook.SUCCESS_STAT_NAME));
+ assertEquals(0, statsProvider.getLongValue(Webhook.ERRORS_STAT_NAME));
+ assertEquals(0, statsProvider.getLongValue(Webhook.USER_ERRORS_STAT_NAME));
}
@Test
- public void testCatchHttpClientException() throws Exception {
- // IOException should be silenced.
- Capture<HttpPost> httpPostCapture = createCapture();
- expect(httpClient.execute(capture(httpPostCapture)))
- .andThrow(new IOException());
- control.replay();
-
- webhook.taskChangedState(CHANGE_WITH_OLD_STATE);
+ public void testTaskChangeNotInWhiteList() throws Exception {
+ // Verifying TaskStateChange not in the whitelist is not sent to the configured endpoint.
+ jettyServer.setHandler(createHandlerThatExpectsContent(CHANGE_JSON));
+ jettyServer.start();
+ WebhookInfo webhookInfo = buildWebhookInfoWithJettyPort(WEBHOOK_INFO_WITH_WHITELIST_BUILDER);
+ Webhook webhook = new Webhook(httpClient, webhookInfo, statsProvider);
+
+ webhook.taskChangedState(CHANGE_OLD_STATE);
}
@Test
public void testParsingWebhookInfo() throws Exception {
+ WebhookInfo webhookInfo = WEBHOOK_INFO_BUILDER
+ .setTargetURL(STATIC_URL)
+ .build();
+
WebhookInfo parsedWebhookInfo = WebhookModule.parseWebhookConfig(
WebhookModule.readWebhookFile());
// Verifying the WebhookInfo parsed from webhook.json file is identical to the WebhookInfo
// built from WebhookInfoBuilder.
- assertEquals(parsedWebhookInfo.toString(), WEBHOOK_INFO.toString());
+ assertEquals(parsedWebhookInfo.toString(), webhookInfo.toString());
// Verifying all attributes were parsed correctly.
- assertEquals(parsedWebhookInfo.getHeaders(), WEBHOOK_INFO.getHeaders());
- assertEquals(parsedWebhookInfo.getTargetURI(), WEBHOOK_INFO.getTargetURI());
+ assertEquals(parsedWebhookInfo.getHeaders(), webhookInfo.getHeaders());
+ assertEquals(parsedWebhookInfo.getTargetURI(), webhookInfo.getTargetURI());
assertEquals(parsedWebhookInfo.getConnectonTimeoutMsec(),
- WEBHOOK_INFO.getConnectonTimeoutMsec());
- assertEquals(parsedWebhookInfo.getWhitelistedStatuses(), WEBHOOK_INFO.getWhitelistedStatuses());
- control.replay();
+ webhookInfo.getConnectonTimeoutMsec());
+ assertEquals(parsedWebhookInfo.getWhitelistedStatuses(), webhookInfo.getWhitelistedStatuses());
}
@Test
public void testWebhookInfo() throws Exception {
- assertEquals(WEBHOOK_INFO.toString(),
+ WebhookInfo webhookInfo = WEBHOOK_INFO_BUILDER
+ .setTargetURL(STATIC_URL)
+ .build();
+
+ assertEquals(webhookInfo.toString(),
"WebhookInfo{headers={"
+ "Content-Type=application/vnd.kafka.json.v1+json, "
+ "Producer-Type=reliable"
+ "}, "
- + "targetURI=http://localhost:5000/, "
- + "connectTimeoutMsec=50, "
+ + "targetURI=http://localhost:8080/, "
+ + "connectTimeoutMsec=5000, "
+ "whitelistedStatuses=null"
+ "}");
// Verifying all attributes were set correctly.
- Map<String, String> headers = ImmutableMap.of(
- "Content-Type", "application/vnd.kafka.json.v1+json",
- "Producer-Type", "reliable");
- assertEquals(WEBHOOK_INFO.getHeaders(), headers);
- URI targetURI = new URI("http://localhost:5000/");
- assertEquals(WEBHOOK_INFO.getTargetURI(), targetURI);
- Integer timeoutMsec = 50;
- assertEquals(WEBHOOK_INFO.getConnectonTimeoutMsec(), timeoutMsec);
- assertFalse(WEBHOOK_INFO.getWhitelistedStatuses().isPresent());
- control.replay();
+ assertEquals(webhookInfo.getHeaders(), HEADERS);
+ assertEquals(webhookInfo.getTargetURI(), URI.create(STATIC_URL));
+ assertEquals(webhookInfo.getConnectonTimeoutMsec(), TIMEOUT);
+ assertFalse(webhookInfo.getWhitelistedStatuses().isPresent());
}
@Test
public void testWebhookInfoWithWhiteList() throws Exception {
- assertEquals(WEBHOOK_INFO_WITH_WHITELIST.toString(),
+ WebhookInfo webhookInfoWithWhitelist = WEBHOOK_INFO_WITH_WHITELIST_BUILDER
+ .setTargetURL(STATIC_URL)
+ .build();
+
+ assertEquals(webhookInfoWithWhitelist.toString(),
"WebhookInfo{headers={"
+ "Content-Type=application/vnd.kafka.json.v1+json, "
+ "Producer-Type=reliable"
+ "}, "
- + "targetURI=http://localhost:5000/, "
- + "connectTimeoutMsec=50, "
+ + "targetURI=http://localhost:8080/, "
+ + "connectTimeoutMsec=5000, "
+ "whitelistedStatuses=[LOST, FAILED]"
+ "}");
// Verifying all attributes were set correctly.
- Map<String, String> headers = ImmutableMap.of(
- "Content-Type", "application/vnd.kafka.json.v1+json",
- "Producer-Type", "reliable");
- assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getHeaders(), headers);
- URI targetURI = new URI("http://localhost:5000/");
- assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getTargetURI(), targetURI);
- Integer timeoutMsec = 50;
- assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getConnectonTimeoutMsec(), timeoutMsec);
- List<ScheduleStatus> statuses = WEBHOOK_INFO_WITH_WHITELIST.getWhitelistedStatuses().get();
+ assertEquals(webhookInfoWithWhitelist.getHeaders(), HEADERS);
+ assertEquals(webhookInfoWithWhitelist.getTargetURI(), URI.create(STATIC_URL));
+ assertEquals(webhookInfoWithWhitelist.getConnectonTimeoutMsec(), TIMEOUT);
+ List<ScheduleStatus> statuses = webhookInfoWithWhitelist.getWhitelistedStatuses().get();
assertEquals(statuses.size(), 2);
assertEquals(statuses.get(0), ScheduleStatus.LOST);
assertEquals(statuses.get(1), ScheduleStatus.FAILED);
- control.replay();
}
@Test
public void testWebhookInfoWithWildcardWhitelist() throws Exception {
- assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.toString(),
+ WebhookInfo webhookInfoWithWildcardWhitelist = WEBHOOK_INFO_WITH_WILDCARD_WHITELIST_BUILDER
+ .setTargetURL(STATIC_URL)
+ .build();
+
+ assertEquals(webhookInfoWithWildcardWhitelist.toString(),
"WebhookInfo{headers={"
+ "Content-Type=application/vnd.kafka.json.v1+json, "
+ "Producer-Type=reliable"
+ "}, "
- + "targetURI=http://localhost:5000/, "
- + "connectTimeoutMsec=50, "
+ + "targetURI=http://localhost:8080/, "
+ + "connectTimeoutMsec=5000, "
+ "whitelistedStatuses=null"
+ "}");
// Verifying all attributes were set correctly.
- Map<String, String> headers = ImmutableMap.of(
- "Content-Type", "application/vnd.kafka.json.v1+json",
- "Producer-Type", "reliable");
- assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getHeaders(), headers);
- URI targetURI = new URI("http://localhost:5000/");
- assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getTargetURI(), targetURI);
- Integer timeoutMsec = 50;
- assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getConnectonTimeoutMsec(), timeoutMsec);
- assertFalse(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getWhitelistedStatuses().isPresent());
- control.replay();
+ assertEquals(webhookInfoWithWildcardWhitelist.getHeaders(), HEADERS);
+ assertEquals(webhookInfoWithWildcardWhitelist.getTargetURI(), URI.create(STATIC_URL));
+ assertEquals(webhookInfoWithWildcardWhitelist.getConnectonTimeoutMsec(), TIMEOUT);
+ assertFalse(webhookInfoWithWildcardWhitelist.getWhitelistedStatuses().isPresent());
+ }
+
+ /** Create a Jetty handler that expects a request with a given content body. */
+ private AbstractHandler createHandlerThatExpectsContent(String expected) {
+ return new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ String body = request.getReader().lines().collect(Collectors.joining());
+ if (validateRequest(request) && body.equals(expected)) {
+ response.setStatus(HttpServletResponse.SC_OK);
+ } else {
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ baseRequest.setHandled(true);
+ }
+ };
+ }
+
+ /** Validate that the request is what we are expecting to send out (ex. POST, headers). */
+ private boolean validateRequest(HttpServletRequest request) {
+ // Validate general fields are what we expect (POST, headers).
+ if (!request.getMethod().equals("POST")) {
+ return false;
+ }
+
+ for (Map.Entry<String, String> header : HEADERS.entrySet()) {
+ String expectedKey = header.getKey();
+ String expectedValue = header.getValue();
+
+ if (!expectedValue.equals(request.getHeader(expectedKey))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Need this method to build `WebhookInfo` for testing with the running Jetty port. `jettyServer`
+ * should have been started before this method is called.
+ */
+ private WebhookInfo buildWebhookInfoWithJettyPort(WebhookInfoBuilder builder) {
+ String fullUrl = String.format("http://localhost:%d", jettyServer.getURI().getPort());
+ return builder
+ .setTargetURL(fullUrl)
+ .build();
}
}