You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by sa...@apache.org on 2017/06/14 00:27:45 UTC

aurora git commit: Add a whitelist for TaskStateChange events in Webhook.

Repository: aurora
Updated Branches:
  refs/heads/master cb86e8358 -> 0001f9091


Add a whitelist for TaskStateChange events in Webhook.

Aurora Scheduler has a webhook module that watches all TaskStateChanges and send events to configured endpoint. This will flood the endpoint with a lot of noise if we only care about certain types of TaskStateChange event(e.g. task state change from RUNNING -> LOST). This CR allows user to provide a whitelist of TaskStateChange event types in their webhook configuration file, so that the webhook will only post these events to the configured endpoint.

Testing Done:
./build-support/jenkins/build.sh

This change is backward compatible. By default, all TaskStateChange statuses will be matched and posted to the configured endpoint.

The user can also match all TaskStateChange statuses using a wildcard character "*" in webhook.json like below:
```
{
  "headers": {
    "Content-Type": "application/vnd.kafka.json.v1+json",
    "Producer-Type": "reliable"
  },
  "targetURL": "http://localhost:5000/",
  "timeoutMsec": 50,
  "statuses": ["*"]
}
```

If they are only interested in TaskStateChange statuses: LOST, FAILED, they can provide them in the whitelist:
```
{
  "headers": {
    "Content-Type": "application/vnd.kafka.json.v1+json",
    "Producer-Type": "reliable"
  },
  "targetURL": "http://localhost:5000/",
  "timeoutMsec": 50,
  "statuses": ["LOST", "FAILED"]
}
```

Bugs closed: AURORA-1934

Reviewed at https://reviews.apache.org/r/59940/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0001f909
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0001f909
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0001f909

Branch: refs/heads/master
Commit: 0001f90914b418859eb9fa86903e89a793e48e9b
Parents: cb86e83
Author: Kai Huang <te...@hotmail.com>
Authored: Tue Jun 13 17:26:51 2017 -0700
Committer: Santhosh Kumar <ss...@twitter.com>
Committed: Tue Jun 13 17:26:51 2017 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   3 +
 docs/features/webhooks.md                       |  32 ++++
 .../apache/aurora/scheduler/events/Webhook.java |  14 +-
 .../aurora/scheduler/events/WebhookInfo.java    |  90 +++++++++-
 .../aurora/scheduler/events/WebhookTest.java    | 171 +++++++++++++++++--
 5 files changed, 286 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/0001f909/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 87283de..e032f79 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -54,6 +54,9 @@
 - Modified job update behavior to create new instances, then update existing instances, and then
   kill unwanted instances. Previously, a job update would modify each instance in the order of
   their instance ID.
+- Added ability to whitelist TaskStateChanges in the webhook configuration file. You can specify
+  a list of desired TaskStateChanges(represented by their task statuses) to be sent to a configured
+  endpoint.
 
 0.17.0
 ======

http://git-wip-us.apache.org/repos/asf/aurora/blob/0001f909/docs/features/webhooks.md
----------------------------------------------------------------------
diff --git a/docs/features/webhooks.md b/docs/features/webhooks.md
index a060975..286746e 100644
--- a/docs/features/webhooks.md
+++ b/docs/features/webhooks.md
@@ -78,3 +78,35 @@ And an example of a response that you will get back:
         },
         "oldState":{}}
 ```
+
+By default, the webhook watches all TaskStateChanges and sends events to configured endpoint. If you
+are only interested in certain types of TaskStateChange (transition to `LOST` or `FAILED` statuses),
+you can specify a whitelist of the desired task statuses in webhook.json. The webhook will only send
+the corresponding events for the whitelisted statuses to the configured endpoint.
+
+```json
+{
+  "headers": {
+    "Content-Type": "application/vnd.kafka.json.v1+json",
+    "Producer-Type": "reliable"
+  },
+  "targetURL": "http://localhost:5000/",
+  "timeoutMsec": 50,
+  "statuses": ["LOST", "FAILED"]
+}
+```
+
+If you want to whitelist all TaskStateChanges, you can add a wildcard character `*` to your whitelist
+like below, or simply leave out the `statuses` field in webhook.json.
+
+```json
+{
+  "headers": {
+    "Content-Type": "application/vnd.kafka.json.v1+json",
+    "Producer-Type": "reliable"
+  },
+  "targetURL": "http://localhost:5000/",
+  "timeoutMsec": 50,
+  "statuses": ["*"]
+}
+```

http://git-wip-us.apache.org/repos/asf/aurora/blob/0001f909/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
index 3868779..05f46a1 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/Webhook.java
@@ -17,10 +17,12 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.time.Instant;
 
+import com.google.common.base.Predicate;
 import com.google.common.eventbus.Subscribe;
 
 import com.google.inject.Inject;
 
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.http.HttpEntity;
@@ -41,11 +43,16 @@ public class Webhook implements EventSubscriber {
 
   private final WebhookInfo webhookInfo;
   private final CloseableHttpClient httpClient;
+  private final Predicate<ScheduleStatus> isWhitelisted;
 
   @Inject
   Webhook(CloseableHttpClient httpClient, WebhookInfo webhookInfo) {
     this.webhookInfo = webhookInfo;
     this.httpClient = httpClient;
+    // A task status is whitelisted if: a) the whitelist is absent, or b) the task status is
+    // explicitly specified in the whitelist.
+    this.isWhitelisted = status -> !webhookInfo.getWhitelistedStatuses().isPresent()
+        || webhookInfo.getWhitelistedStatuses().get().contains(status);
     LOG.info("Webhook enabled with info" + this.webhookInfo);
   }
 
@@ -72,8 +79,9 @@ public class Webhook implements EventSubscriber {
   public void taskChangedState(TaskStateChange stateChange) {
     LOG.debug("Got an event: {}", stateChange);
     // Old state is not present because a scheduler just failed over. In that case we do not want to
-    // resend the entire state.
-    if (stateChange.getOldState().isPresent()) {
+    // resend the entire state. This check also ensures that only whitelisted statuses will be sent
+    // to the configured endpoint.
+    if (stateChange.getOldState().isPresent() && isWhitelisted.apply(stateChange.getNewState())) {
       try {
         HttpPost post = createPostRequest(stateChange);
         // Using try-with-resources on closeable and following
@@ -82,7 +90,7 @@ public class Webhook implements EventSubscriber {
         try (CloseableHttpResponse httpResponse = httpClient.execute(post)) {
           HttpEntity entity = httpResponse.getEntity();
           EntityUtils.consumeQuietly(entity);
-        }  catch (IOException exp) {
+        } catch (IOException exp) {
           LOG.error("Error sending a Webhook event", exp);
         }
       } catch (UnsupportedEncodingException exp) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/0001f909/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
index 37c0d79..da22c21 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/WebhookInfo.java
@@ -15,12 +15,21 @@ package org.apache.aurora.scheduler.events;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.aurora.gen.ScheduleStatus;
 import org.codehaus.jackson.annotate.JsonCreator;
 import org.codehaus.jackson.annotate.JsonProperty;
 
@@ -33,6 +42,7 @@ public class WebhookInfo {
   private final Integer connectTimeoutMsec;
   private final Map<String, String> headers;
   private final URI targetURI;
+  private final Optional<List<ScheduleStatus>> whitelistedStatuses;
 
   /**
    * Return key:value pairs of headers to set for every connection.
@@ -61,23 +71,93 @@ public class WebhookInfo {
     return connectTimeoutMsec;
   }
 
+  /**
+   * Returns an optional list of task statuses to be subscribed by the webhook.
+   *
+   * @return an optional list of ScheduleStatus.
+   */
+  Optional<List<ScheduleStatus>> getWhitelistedStatuses() {
+    return whitelistedStatuses;
+  }
+
+  private static final Predicate<List<String>> IS_ALL_WHITELISTED = statuses ->
+      !Optional.fromNullable(statuses).isPresent()
+          || Optional.fromNullable(statuses).get().stream().anyMatch(status -> "*".equals(status));
+
   @JsonCreator
   public WebhookInfo(
        @JsonProperty("headers") Map<String, String> headers,
        @JsonProperty("targetURL") String targetURL,
-       @JsonProperty("timeoutMsec") Integer timeout) throws URISyntaxException {
+       @JsonProperty("timeoutMsec") Integer timeout,
+       @JsonProperty("statuses") List<String> statuses) throws URISyntaxException {
 
     this.headers = ImmutableMap.copyOf(headers);
     this.targetURI = new URI(requireNonNull(targetURL));
     this.connectTimeoutMsec = requireNonNull(timeout);
+    this.whitelistedStatuses = IS_ALL_WHITELISTED.apply(statuses) ? Optional.absent()
+        : Optional.fromNullable(statuses).transform(
+            s -> ImmutableList.copyOf(s.stream()
+                .map(ScheduleStatus::valueOf)
+                .collect(Collectors.toList())));
+  }
+
+  WebhookInfo(WebhookInfoBuilder builder) throws URISyntaxException {
+    this(builder.headers, builder.targetURL, builder.timeout, builder.statuses);
+  }
+
+  @VisibleForTesting
+  static WebhookInfoBuilder newBuilder() {
+    return new WebhookInfoBuilder();
+  }
+
+  static class WebhookInfoBuilder {
+    private Integer timeout;
+    private Map<String, String> headers;
+    private String targetURL;
+    private List<String> statuses;
+
+    public WebhookInfoBuilder setTimeout(Integer timeout) {
+      this.timeout = timeout;
+      return this;
+    }
+
+    public WebhookInfoBuilder setHeader(String key, String value) {
+      if (headers == null) {
+        headers = new LinkedHashMap<>();
+      }
+      headers.put(key, value);
+      return this;
+    }
+
+    public WebhookInfoBuilder setTargetURL(String targetURL) {
+      this.targetURL = targetURL;
+      return this;
+    }
+
+    public WebhookInfoBuilder addWhitelistedStatus(String status) {
+      if (statuses == null) {
+        statuses = new ArrayList<>();
+      }
+      statuses.add(status);
+      return this;
+    }
+
+    public WebhookInfo build() {
+      try {
+        return new WebhookInfo(this);
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
-      .add("headers", headers.toString())
-      .add("targetURI", targetURI.toString())
-      .add("connectTimeoutMsec", connectTimeoutMsec)
-      .toString();
+        .add("headers", headers.toString())
+        .add("targetURI", targetURI.toString())
+        .add("connectTimeoutMsec", connectTimeoutMsec)
+        .add("whitelistedStatuses", whitelistedStatuses.orNull())
+        .toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/0001f909/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
index e8335d9..07f39fa 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/WebhookTest.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.events;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
@@ -39,25 +40,51 @@ import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class WebhookTest extends EasyMockTest {
 
   private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", TaskTestUtil.JOB);
-  private final TaskStateChange change = TaskStateChange.initialized(TASK);
-  private final TaskStateChange changeWithOldState = TaskStateChange
+  private static final TaskStateChange CHANGE = TaskStateChange.initialized(TASK);
+  private static final TaskStateChange CHANGE_WITH_OLD_STATE = TaskStateChange
       .transition(TASK, ScheduleStatus.FAILED);
-  private final String changeJson = changeWithOldState.toJson();
+  private static final String CHANGE_JSON = CHANGE_WITH_OLD_STATE.toJson();
+  // Test fixture for WebhookInfo without a whitelist, thus all task statuses are implicitly
+  // whitelisted.
+  private static final WebhookInfo WEBHOOK_INFO = WebhookInfo.newBuilder()
+      .setHeader("Content-Type", "application/vnd.kafka.json.v1+json")
+      .setHeader("Producer-Type", "reliable")
+      .setTargetURL("http://localhost:5000/")
+      .setTimeout(50)
+      .build();
+  // Test fixture for WebhookInfo in which only "LOST" and "FAILED" task statuses are explicitly
+  // whitelisted.
+  private static final WebhookInfo WEBHOOK_INFO_WITH_WHITELIST = WebhookInfo.newBuilder()
+      .setHeader("Content-Type", "application/vnd.kafka.json.v1+json")
+      .setHeader("Producer-Type", "reliable")
+      .setTargetURL("http://localhost:5000/")
+      .setTimeout(50)
+      .addWhitelistedStatus("LOST")
+      .addWhitelistedStatus("FAILED")
+      .build();
+  // Test fixture for WebhookInfo in which all task statuses are whitelisted by wildcard character.
+  private static final WebhookInfo WEBHOOK_INFO_WITH_WILDCARD_WHITELIST = WebhookInfo.newBuilder()
+      .setHeader("Content-Type", "application/vnd.kafka.json.v1+json")
+      .setHeader("Producer-Type", "reliable")
+      .setTargetURL("http://localhost:5000/")
+      .setTimeout(50)
+      .addWhitelistedStatus("*")
+      .build();
 
   private CloseableHttpClient httpClient;
   private Webhook webhook;
 
   @Before
   public void setUp() {
-    WebhookInfo webhookInfo = WebhookModule.parseWebhookConfig(WebhookModule.readWebhookFile());
     httpClient = createMock(CloseableHttpClient.class);
-    webhook = new Webhook(httpClient, webhookInfo);
+    webhook = new Webhook(httpClient, WEBHOOK_INFO);
   }
 
   @Test
@@ -65,7 +92,8 @@ public class WebhookTest extends EasyMockTest {
     // Should be a noop as oldState is MIA so this test would have throw an exception.
     // If it does not, then we are good.
     control.replay();
-    webhook.taskChangedState(change);
+
+    webhook.taskChangedState(CHANGE);
   }
 
   @Test
@@ -82,11 +110,11 @@ public class WebhookTest extends EasyMockTest {
 
     control.replay();
 
-    webhook.taskChangedState(changeWithOldState);
+    webhook.taskChangedState(CHANGE_WITH_OLD_STATE);
 
     assertTrue(httpPostCapture.hasCaptured());
     assertEquals(httpPostCapture.getValue().getURI(), new URI("http://localhost:5000/"));
-    assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()), changeJson);
+    assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()), CHANGE_JSON);
     Header[] producerTypeHeader = httpPostCapture.getValue().getHeaders("Producer-Type");
     assertEquals(producerTypeHeader.length, 1);
     assertEquals(producerTypeHeader[0].getName(), "Producer-Type");
@@ -99,6 +127,51 @@ public class WebhookTest extends EasyMockTest {
   }
 
   @Test
+  public void testTaskChangeInWhiteList() throws Exception {
+    CloseableHttpResponse httpResponse = createMock(CloseableHttpResponse.class);
+    HttpEntity entity = createMock(HttpEntity.class);
+
+    Capture<HttpPost> httpPostCapture = createCapture();
+    expect(entity.isStreaming()).andReturn(false);
+    expect(httpResponse.getEntity()).andReturn(entity);
+    httpResponse.close();
+    expectLastCall().once();
+    expect(httpClient.execute(capture(httpPostCapture))).andReturn(httpResponse);
+
+    control.replay();
+
+    // Verifying TaskStateChange in the whitelist is sent to the configured endpoint.
+    Webhook webhookWithWhitelist = new Webhook(httpClient, WEBHOOK_INFO_WITH_WHITELIST);
+    TaskStateChange taskStateChangeInWhitelist = TaskStateChange
+        .transition(TaskTestUtil.addStateTransition(TASK, ScheduleStatus.LOST, 1L),
+            ScheduleStatus.RUNNING);
+    webhookWithWhitelist.taskChangedState(taskStateChangeInWhitelist);
+
+    assertTrue(httpPostCapture.hasCaptured());
+    assertEquals(httpPostCapture.getValue().getURI(), new URI("http://localhost:5000/"));
+    assertEquals(EntityUtils.toString(httpPostCapture.getValue().getEntity()),
+        taskStateChangeInWhitelist.toJson());
+    Header[] producerTypeHeader = httpPostCapture.getValue().getHeaders("Producer-Type");
+    assertEquals(producerTypeHeader.length, 1);
+    assertEquals(producerTypeHeader[0].getName(), "Producer-Type");
+    assertEquals(producerTypeHeader[0].getValue(), "reliable");
+    Header[] contentTypeHeader = httpPostCapture.getValue().getHeaders("Content-Type");
+    assertEquals(contentTypeHeader.length, 1);
+    assertEquals(contentTypeHeader[0].getName(), "Content-Type");
+    assertEquals(contentTypeHeader[0].getValue(), "application/vnd.kafka.json.v1+json");
+    assertNotNull(httpPostCapture.getValue().getHeaders("Timestamp"));
+  }
+
+  @Test
+  public void testTaskChangeNotInWhiteList() throws Exception {
+    control.replay();
+
+    // Verifying TaskStateChange not in the whitelist is not sent to the configured endpoint.
+    Webhook webhookWithWhitelist = new Webhook(httpClient, WEBHOOK_INFO_WITH_WHITELIST);
+    webhookWithWhitelist.taskChangedState(CHANGE_WITH_OLD_STATE);
+  }
+
+  @Test
   public void testCatchHttpClientException() throws Exception {
     // IOException should be silenced.
     Capture<HttpPost> httpPostCapture = createCapture();
@@ -106,31 +179,97 @@ public class WebhookTest extends EasyMockTest {
         .andThrow(new IOException());
     control.replay();
 
-    webhook.taskChangedState(changeWithOldState);
+    webhook.taskChangedState(CHANGE_WITH_OLD_STATE);
   }
 
   @Test
-  public void testWebhookInfo() throws Exception {
+  public void testParsingWebhookInfo() throws Exception {
     WebhookInfo parsedWebhookInfo = WebhookModule.parseWebhookConfig(
         WebhookModule.readWebhookFile());
-    assertEquals(parsedWebhookInfo.toString(),
+    // Verifying the WebhookInfo parsed from webhook.json file is identical to the WebhookInfo
+    // built from WebhookInfoBuilder.
+    assertEquals(parsedWebhookInfo.toString(), WEBHOOK_INFO.toString());
+    // Verifying all attributes were parsed correctly.
+    assertEquals(parsedWebhookInfo.getHeaders(), WEBHOOK_INFO.getHeaders());
+    assertEquals(parsedWebhookInfo.getTargetURI(), WEBHOOK_INFO.getTargetURI());
+    assertEquals(parsedWebhookInfo.getConnectonTimeoutMsec(),
+        WEBHOOK_INFO.getConnectonTimeoutMsec());
+    assertEquals(parsedWebhookInfo.getWhitelistedStatuses(), WEBHOOK_INFO.getWhitelistedStatuses());
+    control.replay();
+  }
+
+  @Test
+  public void testWebhookInfo() throws Exception {
+    assertEquals(WEBHOOK_INFO.toString(),
         "WebhookInfo{headers={"
             + "Content-Type=application/vnd.kafka.json.v1+json, "
             + "Producer-Type=reliable"
             + "}, "
             + "targetURI=http://localhost:5000/, "
-            + "connectTimeoutMsec=50"
+            + "connectTimeoutMsec=50, "
+            + "whitelistedStatuses=null"
             + "}");
-    // Verifying all attributes were parsed correctly.
+    // Verifying all attributes were set correctly.
     Map<String, String> headers = ImmutableMap.of(
         "Content-Type", "application/vnd.kafka.json.v1+json",
         "Producer-Type", "reliable");
-    assertEquals(parsedWebhookInfo.getHeaders(), headers);
+    assertEquals(WEBHOOK_INFO.getHeaders(), headers);
     URI targetURI = new URI("http://localhost:5000/");
-    assertEquals(parsedWebhookInfo.getTargetURI(), targetURI);
+    assertEquals(WEBHOOK_INFO.getTargetURI(), targetURI);
     Integer timeoutMsec = 50;
-    assertEquals(parsedWebhookInfo.getConnectonTimeoutMsec(), timeoutMsec);
+    assertEquals(WEBHOOK_INFO.getConnectonTimeoutMsec(), timeoutMsec);
+    assertFalse(WEBHOOK_INFO.getWhitelistedStatuses().isPresent());
+    control.replay();
+  }
 
+  @Test
+  public void testWebhookInfoWithWhiteList() throws Exception {
+    assertEquals(WEBHOOK_INFO_WITH_WHITELIST.toString(),
+        "WebhookInfo{headers={"
+            + "Content-Type=application/vnd.kafka.json.v1+json, "
+            + "Producer-Type=reliable"
+            + "}, "
+            + "targetURI=http://localhost:5000/, "
+            + "connectTimeoutMsec=50, "
+            + "whitelistedStatuses=[LOST, FAILED]"
+            + "}");
+    // Verifying all attributes were set correctly.
+    Map<String, String> headers = ImmutableMap.of(
+        "Content-Type", "application/vnd.kafka.json.v1+json",
+        "Producer-Type", "reliable");
+    assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getHeaders(), headers);
+    URI targetURI = new URI("http://localhost:5000/");
+    assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getTargetURI(), targetURI);
+    Integer timeoutMsec = 50;
+    assertEquals(WEBHOOK_INFO_WITH_WHITELIST.getConnectonTimeoutMsec(), timeoutMsec);
+    List<ScheduleStatus> statuses = WEBHOOK_INFO_WITH_WHITELIST.getWhitelistedStatuses().get();
+    assertEquals(statuses.size(), 2);
+    assertEquals(statuses.get(0), ScheduleStatus.LOST);
+    assertEquals(statuses.get(1), ScheduleStatus.FAILED);
+    control.replay();
+  }
+
+  @Test
+  public void testWebhookInfoWithWildcardWhitelist() throws Exception {
+    assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.toString(),
+        "WebhookInfo{headers={"
+            + "Content-Type=application/vnd.kafka.json.v1+json, "
+            + "Producer-Type=reliable"
+            + "}, "
+            + "targetURI=http://localhost:5000/, "
+            + "connectTimeoutMsec=50, "
+            + "whitelistedStatuses=null"
+            + "}");
+    // Verifying all attributes were set correctly.
+    Map<String, String> headers = ImmutableMap.of(
+        "Content-Type", "application/vnd.kafka.json.v1+json",
+        "Producer-Type", "reliable");
+    assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getHeaders(), headers);
+    URI targetURI = new URI("http://localhost:5000/");
+    assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getTargetURI(), targetURI);
+    Integer timeoutMsec = 50;
+    assertEquals(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getConnectonTimeoutMsec(), timeoutMsec);
+    assertFalse(WEBHOOK_INFO_WITH_WILDCARD_WHITELIST.getWhitelistedStatuses().isPresent());
     control.replay();
   }
 }