You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by co...@apache.org on 2017/11/16 09:56:41 UTC

[04/32] sentry git commit: SENTRY-2027: Create mechanism of delivering commands via WebUI (Vadim Spector, reviewed by Sergio Pena and Arjun Mishra)

SENTRY-2027: Create mechanism of delivering commands via WebUI (Vadim Spector, reviewed by Sergio Pena and Arjun Mishra)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/64476a74
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/64476a74
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/64476a74

Branch: refs/heads/akolb-cli
Commit: 64476a7478c8f9aec1ba0ad4388d15b0bb6db352
Parents: c6723a8
Author: Vadim Spector <vs...@cloudera.com>
Authored: Thu Nov 2 11:25:14 2017 -0700
Committer: Vadim Spector <vs...@cloudera.com>
Committed: Thu Nov 2 11:25:14 2017 -0700

----------------------------------------------------------------------
 .../apache/sentry/core/common/utils/PubSub.java | 178 ++++++++++++++++++
 .../db/service/thrift/PubSubServlet.java        | 128 +++++++++++++
 .../db/service/thrift/SentryWebServer.java      |   5 +
 .../sentry/service/thrift/ServiceConstants.java |   3 +
 .../service/thrift/TestSentryServerPubSub.java  | 181 +++++++++++++++++++
 .../thrift/SentryServiceIntegrationBase.java    |   1 +
 6 files changed, 496 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PubSub.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PubSub.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PubSub.java
new file mode 100644
index 0000000..eeb8f80
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PubSub.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.utils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is a simple publish-subscribe implementation for internal
+ * communication between Sentry components. It's a singleton class.
+ * <p>
+ * For the initial set of use cases, publish events are expected to be
+ * extremely rare, so no the data structures have been selected with no
+ * consideration for high concurrency.
+ */
+public final class PubSub {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PubSub.class);
+
+  private final Map<Topic,Set<Subscriber>> subscriberMap = new HashMap<>();
+
+  private static PubSub instance;
+
+  /**
+   * Subscriber callback interface.
+   * The same subscriber can subscribe to multiple topics,
+   * so callback API includes both topic and message.
+   */
+  public interface Subscriber {
+    void onMessage(Topic topic, String message);
+  }
+
+  /**
+   * Enumerated topics one can subscribe to.
+   * To be expanded as needed.
+   */
+  public enum Topic {
+    HDFS_SYNC_HMS("hdfs-sync-hms"), // upcoming feature, triggering HDFS sync between HMS and Sentry
+    HDFS_SYNC_NN("hdfs-sync-nn"); // upcoming feature, triggering HDFS sync between Sentry and NameNode
+
+    private final String name;
+
+    private static final Map<String,Topic> map = new HashMap<>();
+    static {
+      for (Topic t : Topic.values()) {
+        map.put(t.name, t);
+      }
+    }
+
+    public static Topic fromString(String name) {
+      Preconditions.checkNotNull("Enum name cannot be null", name);
+      name = name.trim().toLowerCase();
+      if (map.containsKey(name)) {
+        return map.get(name);
+      }
+      throw new NoSuchElementException(name + " not found");
+    }
+
+    private Topic(String name) {
+      this.name = name.toLowerCase();
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  /**
+   * Public factory method to guarantee singleton
+   */
+  public static synchronized PubSub getInstance() {
+    if (instance != null) {
+      LOGGER.info(instance + " requested");
+    } else {
+      instance = new PubSub();
+      LOGGER.info(instance + " created");
+    }
+    return instance;
+  }
+
+  // declare private to prevent multiple class instantiation
+  private PubSub() {
+  }
+
+  /**
+   * Publish message on given topic. Message is optional.
+   */
+  public synchronized void publish(Topic topic, String message) {
+    Preconditions.checkNotNull(topic, "Topic cannot be null");
+
+    Set<Subscriber> subscribers = subscriberMap.get(topic);
+    if (subscribers == null) {
+      throw new IllegalArgumentException("cannot publish to unknown topic " + topic
+        + ", existing topics " + subscriberMap.keySet());
+    }
+
+    for (Subscriber subscriber : subscribers) {
+      // Faulire of one subscriber to process message delivery should not affect
+      // message delivery to other subscribers, therefore using try-catch.
+      try {
+        subscriber.onMessage(topic, message);
+      } catch (Exception e) {
+        LOGGER.error("Topic " + topic + ", message " + message + ", delivery error", e);
+      }
+    }
+    LOGGER.info("Topic " + topic + ", message " + message + ": " + subscribers.size() + " subscribers called");
+  }
+
+  /**
+   * Subscribe to given topic.
+   */
+  public synchronized void subscribe(Topic topic, Subscriber subscriber) {
+    Preconditions.checkNotNull(topic, "Topic cannot be null");
+    Preconditions.checkNotNull(subscriber, "Topic %s: Subscriber cannot be null", topic);
+
+    Set<Subscriber> subscribers = subscriberMap.get(topic);
+    if (subscribers == null) {
+      LOGGER.info("new topic " + topic);
+      subscriberMap.put(topic, subscribers = new HashSet<Subscriber>());
+    }
+    subscribers.add(subscriber);
+    LOGGER.info("Topic " + topic + ", added subscriber " + subscriber + ", total topic subscribers: " + subscribers.size());
+  }
+
+  /**
+   * Unsubscribe from given topic. If the last subscriber, remove the topic.
+   */
+  public synchronized void unsubscribe(Topic topic, Subscriber subscriber) {
+    Preconditions.checkNotNull(topic, "Topic cannot be null");
+    Preconditions.checkNotNull(subscriber, "Topic %s: Subscriber cannot be null", topic);
+
+    Set<Subscriber> subscribers = subscriberMap.get(topic);
+    if (subscribers == null) {
+      throw new IllegalArgumentException("cannot unsubscribe from unknown topic " + topic);
+    }
+    if (!subscribers.remove(subscriber)) {
+      throw new IllegalArgumentException("cannot unsubscribe from topic " + topic + ", unknown subscriber");
+    }
+    LOGGER.info("Topic " + topic + ", unsubscribing subscriber " + subscriber + ", total topic subscribers: " + subscribers.size());
+    if (subscribers.isEmpty()) {
+      subscriberMap.remove(topic);
+    }
+  }
+
+  /**
+   * Get all existing topics.
+   */
+  public synchronized Set<Topic> getTopics() {
+    return  subscriberMap.keySet();
+  }
+  
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + ":" + Integer.toHexString(hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PubSubServlet.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PubSubServlet.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PubSubServlet.java
new file mode 100644
index 0000000..6756d91
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PubSubServlet.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.db.service.thrift;
+
+import org.apache.sentry.core.common.utils.PubSub;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import static org.apache.commons.lang.StringEscapeUtils.escapeHtml;
+
+/**
+ * This servlet facilitates sending {topic, message } tuples to Servlet components 
+ * subscribed to specific topics.
+ * <p>
+ * It uses publish-subscribe mechanism implemented by PubSub class.
+ * The form generated by this servlet consists of the following elements:
+ * <p>
+ * a) Topic: pull-down menu of existing topics, i.e. the topics registered with
+ * PubSub by calling PubSub.subscribe() API. This prevents entering invalid topic.
+ * <p>
+ * b) Message: text field for entering a message
+ * <p>
+ * c) Submit: button to submit (topic, message) tuple
+ * <p>
+ * d) Status: text area printing status of the request or help information.
+ */
+public class PubSubServlet extends HttpServlet {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PubSubServlet.class);
+
+  private static final String FORM_GET =
+    "<!DOCTYPE html>" +
+    "<html>" +
+    "<body>" +
+    "<form>" +
+    "<br><br><b>Topic:</b><br><br>" +
+    "<select name='topic'/>%s</select>" +
+    "<br><br><b>Message:</b><br><br>" +
+    "<input type='text' size='50' name='message'/>" +
+    "<br><br>" +
+    "<input type='submit' value='Submit'/>" +
+    "</form>" +
+    "<br><br><b>Status:</b><br><br>" +
+    "<textarea rows='4' cols='50'>%s</textarea>" +
+    "</body>" +
+    "</html>";
+
+  /**
+   * Return parameter on servlet request for the given name
+   *
+   * @param request: Servlet request
+   * @param name: Name of parameter in servlet request
+   * @return Parameter in servlet request for the given name, return null if can't find parameter.
+   */
+  private static String getParameter(ServletRequest request, String name) {
+    String s = request.getParameter(name);
+    if (s == null) {
+      return null;
+    }
+    s = s.trim();
+    return s.isEmpty() ? null : s;
+  }
+
+  /**
+   * Parse the topic and message values and submit them via PubSub.submit() API.
+   * Reject request for unknown topic, i.e. topic no one is subscribed to.
+   */
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response)
+          throws ServletException, IOException {
+    String topic = getParameter(request, "topic");
+    String message = getParameter(request, "message");
+    response.setContentType("text/html;charset=utf-8");
+    response.setStatus(HttpServletResponse.SC_OK);
+    PrintWriter out = response.getWriter();
+
+    String msg = "Topic is required, Message is optional.\nValid topics: " + PubSub.getInstance().getTopics();
+    if (topic != null) {
+      LOGGER.info("Submitting topic " + topic + ", message " + message);
+      try {
+        PubSub.getInstance().publish(PubSub.Topic.fromString(topic), message);
+        msg = "Submitted topic " + topic + ", message " + message;
+      } catch (Exception e) {
+        msg = "Failed to submit topic " + topic + ", message " + message + " - " + e.getMessage();
+        LOGGER.error(msg);
+        response.sendError(HttpServletResponse.SC_BAD_REQUEST, msg);
+        return;
+      }
+    }
+
+    StringBuilder topics = new StringBuilder();
+    for (PubSub.Topic t : PubSub.getInstance().getTopics()) {
+      topics.append("<option>").append(t.getName()).append("</option>");
+    }
+
+    String output = String.format(FORM_GET, topics.toString(), escapeHtml(msg));
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("HTML Page: " + output);
+    }
+    out.write(output);
+    out.close();
+    response.flushBuffer();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java
index 66a2f9e..95b87ad 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryWebServer.java
@@ -122,6 +122,11 @@ public class SentryWebServer {
 
     servletContextHandler.addServlet(new ServletHolder(LogLevelServlet.class), "/admin/logLevel");
 
+    if (conf.getBoolean(ServerConfig.SENTRY_WEB_PUBSUB_SERVLET_ENABLED,
+                        ServerConfig.SENTRY_WEB_PUBSUB_SERVLET_ENABLED_DEFAULT)) {
+      servletContextHandler.addServlet(new ServletHolder(PubSubServlet.class), "/admin/publishMessage");
+    }
+
     ResourceHandler resourceHandler = new ResourceHandler();
     resourceHandler.setDirectoriesListed(true);
     URL url = this.getClass().getResource(RESOURCE_DIR);

http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index 8dd5497..0a1e0ae 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -216,6 +216,9 @@ public class ServiceConstants {
     public static final String SENTRY_WEB_ADMIN_SERVLET_ENABLED = "sentry.web.admin.servlet.enabled";
     public static final boolean SENTRY_WEB_ADMIN_SERVLET_ENABLED_DEFAULT = false;
 
+    public static final String SENTRY_WEB_PUBSUB_SERVLET_ENABLED = "sentry.web.pubsub.servlet.enabled";
+    public static final boolean SENTRY_WEB_PUBSUB_SERVLET_ENABLED_DEFAULT = false;
+
     // max message size for thrift messages
     public static final String SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE = "sentry.policy.server.thrift.max.message.size";
     public static final long SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = 100 * 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerPubSub.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerPubSub.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerPubSub.java
new file mode 100644
index 0000000..451d7a1
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerPubSub.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.db.service.thrift;
+
+import org.apache.sentry.core.common.utils.PubSub;
+import org.apache.sentry.core.common.utils.PubSub.Topic;
+import org.apache.sentry.service.thrift.SentryServiceIntegrationBase;
+
+import org.junit.*;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class TestSentryServerPubSub extends SentryServiceIntegrationBase {
+
+  private static final Topic[] topics = Topic.values();
+  private static final String[] messages = { "message1", "message2", "message3", "" };
+
+  private static volatile String REQUEST_URL;
+
+  private final TestSubscriber testSubscriber = new TestSubscriber();
+
+  private static final class TestSubscriber implements PubSub.Subscriber {
+    private volatile Topic topic;
+    private volatile String message;
+    private volatile int count;
+    @Override
+    public void onMessage(Topic topic, String message) {
+      this.topic = topic;
+      this.message = message;
+      this.count++;
+    }
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    webServerEnabled = true;
+    webSecurity = false;
+    SentryServiceIntegrationBase.setup();
+    REQUEST_URL= "http://" + SERVER_HOST + ":" + webServerPort + "/admin/publishMessage?topic=%s&message=%s";
+  }
+
+  @Override
+  @Before
+  public void before() throws Exception {
+
+    // Subscribe  to all defined topics.
+    // After each successfull HTTP-GET, testSubscriber.onMessage()
+    // will be called and "topic" and "message" fields will be
+    // set according to HTTP-GET parameters.
+    testSubscriber.count = 0;
+    for (Topic topic : topics) {
+      PubSub.getInstance().subscribe(topic, testSubscriber);
+    }
+    Assert.assertEquals("Unexpected number of registered topics", topics.length, PubSub.getInstance().getTopics().size());
+  }
+
+  @Override
+  @After
+  public void after() {
+    // unsubscribe
+    for (Topic topic : topics) {
+      PubSub.getInstance().unsubscribe(topic, testSubscriber);
+    }
+    testSubscriber.count = 0;
+    Assert.assertTrue("Topics should have been removed after unsubscribe()", PubSub.getInstance().getTopics().isEmpty());
+  }
+
+  /**
+   * Successfully publish notifications
+   * @throws Exception
+   */
+  @Test
+  public void testPubSub() throws Exception {
+    int count = 0;
+    for (Topic topic : topics) {
+      for (String message : messages) {
+        URL url = new URL(String.format(REQUEST_URL, topic.getName(), message));
+        HttpURLConnection conn = null;
+        try {
+          conn = (HttpURLConnection) url.openConnection();
+          Assert.assertEquals("Unexpected response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
+        } finally {
+          safeClose(conn);
+        }
+        Assert.assertEquals("Unexpected topic", topic, testSubscriber.topic);
+        if (message.isEmpty()) {
+          Assert.assertEquals("Unexpected message", null, testSubscriber.message);
+        } else {
+          Assert.assertEquals("Unexpected message", message, testSubscriber.message);
+        }
+        Assert.assertEquals("Unexpected number of PubSub.onMessage() callbacks", ++count, testSubscriber.count);
+      }
+    }
+  }
+
+  /**
+   * Submit empty topic. It's ok, generates form page.
+   * @throws Exception
+   */
+  @Test
+  public void testPubSubEmptyTopic() throws Exception {
+    URL url = new URL(String.format(REQUEST_URL, "", "message"));
+    HttpURLConnection conn = null;
+    try {
+      conn = (HttpURLConnection) url.openConnection();
+      Assert.assertEquals("Unexpected response code", HttpURLConnection.HTTP_OK,  conn.getResponseCode());
+    } finally {
+      safeClose(conn);
+    }
+    Assert.assertEquals("Unexpected number of PubSub.onMessage() callbacks", 0, testSubscriber.count);
+  }
+
+  /**
+   * Submit invalid topic
+   * @throws Exception
+   */
+  @Test
+  public void testPubSubInvalidTopic() throws Exception {
+    String[] invalid_topics = { "invalid_topic_1", "invalid_topic_2", "invalid_topic_3" };
+    for (String topic : invalid_topics) {
+      URL url = new URL(String.format(REQUEST_URL, topic, "message"));
+      HttpURLConnection conn = null;
+      try {
+        conn = (HttpURLConnection) url.openConnection();
+        Assert.assertEquals("Unexpected response code", HttpURLConnection.HTTP_BAD_REQUEST,  conn.getResponseCode());
+      } finally {
+        safeClose(conn);
+      }
+      Assert.assertEquals("Unexpected number of PubSub.onMessage() callbacks", 0, testSubscriber.count);
+    }
+  }
+
+  /**
+   * Submit topic that has no subscribers.
+   * @throws Exception
+   */
+  @Test
+  public void testPubSubNonSubscribedTopic() throws Exception {
+    // At this point all valid Topic values have been subscribed to
+    // in before() method.
+    // Unsubscribe from one topic and then try publishing to it.
+    PubSub.getInstance().unsubscribe(Topic.HDFS_SYNC_HMS, testSubscriber);
+    Assert.assertEquals("Unexpected number of registered topics", topics.length-1, PubSub.getInstance().getTopics().size());
+
+    URL url = new URL(String.format(REQUEST_URL, Topic.HDFS_SYNC_HMS.getName(), "message"));
+    HttpURLConnection conn = null;
+    try {
+      conn = (HttpURLConnection) url.openConnection();
+      Assert.assertEquals("Unexpected response code", HttpURLConnection.HTTP_BAD_REQUEST,  conn.getResponseCode());
+    } finally {
+      safeClose(conn);
+    }
+    // re-subscribe, not to upset after() method which expects all topics to be subscribed to
+    PubSub.getInstance().subscribe(Topic.HDFS_SYNC_HMS, testSubscriber);
+  }
+
+  private static void safeClose(HttpURLConnection conn) {
+    if (conn != null) {
+      try {
+        conn.disconnect();
+      } catch (Exception ignore) {
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/64476a74/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java
index 7d9b3ba..65f1e30 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java
@@ -144,6 +144,7 @@ public abstract class SentryServiceIntegrationBase extends SentryMiniKdcTestcase
     if (webServerEnabled) {
       conf.set(ServerConfig.SENTRY_WEB_ENABLE, "true");
       conf.set(ServerConfig.SENTRY_WEB_PORT, String.valueOf(webServerPort));
+      conf.set(ServerConfig.SENTRY_WEB_PUBSUB_SERVLET_ENABLED, "true");
       if (webSecurity) {
         httpKeytab = new File(kdcWorkDir, "http.keytab");
         kdc.createPrincipal(httpKeytab, HTTP_PRINCIPAL);