You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/05/06 07:38:58 UTC

[1/2] incubator-atlas git commit: ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 334429a83 -> 1e3029bc7


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java b/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
index 79b8124..64e6e92 100755
--- a/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
+++ b/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
@@ -21,7 +21,6 @@ package org.apache.atlas.examples;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
@@ -112,9 +111,9 @@ public class QuickStart {
 
     private final AtlasClient metadataServiceClient;
 
-    QuickStart(String baseUrl) {
+    QuickStart(String baseUrl) throws AtlasException {
         String[] urls = baseUrl.split(",");
-        metadataServiceClient = new AtlasClient(null, null, urls);
+        metadataServiceClient = new AtlasClient(urls);
     }
 
     void createTypes() throws Exception {
@@ -292,11 +291,11 @@ public class QuickStart {
 
         String entityJSON = InstanceSerialization.toJson(referenceable, true);
         System.out.println("Submitting new entity= " + entityJSON);
-        JSONArray guids = metadataServiceClient.createEntity(entityJSON);
+        List<String> guids = metadataServiceClient.createEntity(entityJSON);
         System.out.println("created instance for type " + typeName + ", guid: " + guids);
 
         // return the Id for created instance with guid
-        return new Id(guids.getString(guids.length()-1), referenceable.getId().getVersion(),
+        return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),
                 referenceable.getTypeName());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
new file mode 100644
index 0000000..8ef2f64
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import kafka.consumer.ConsumerTimeoutException;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.LocalAtlasClient;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.service.Service;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Consumer of notifications from hooks e.g., hive hook etc.
+ */
+@Singleton
+public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
+    private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
+
+    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
+    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
+    private final LocalAtlasClient atlasClient;
+
+    private NotificationInterface notificationInterface;
+    private ExecutorService executors;
+    private Configuration applicationProperties;
+    private List<HookConsumer> consumers;
+
+    @Inject
+    public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) {
+        this.notificationInterface = notificationInterface;
+        this.atlasClient = atlasClient;
+    }
+
+    @Override
+    public void start() throws AtlasException {
+        Configuration configuration = ApplicationProperties.get();
+        startInternal(configuration, null);
+    }
+
+    void startInternal(Configuration configuration,
+                       ExecutorService executorService) {
+        this.applicationProperties = configuration;
+        if (consumers == null) {
+            consumers = new ArrayList<>();
+        }
+        if (executorService != null) {
+            executors = executorService;
+        }
+        if (!HAConfiguration.isHAEnabled(configuration)) {
+            LOG.info("HA is disabled, starting consumers inline.");
+            startConsumers(executorService);
+        }
+    }
+
+    private void startConsumers(ExecutorService executorService) {
+        int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
+        List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
+                notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
+        if (executorService == null) {
+            executorService = Executors.newFixedThreadPool(notificationConsumers.size(),
+                    new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
+        }
+        executors = executorService;
+        for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
+            HookConsumer hookConsumer = new HookConsumer(consumer);
+            consumers.add(hookConsumer);
+            executors.submit(hookConsumer);
+        }
+    }
+
+    @Override
+    public void stop() {
+        //Allow for completion of outstanding work
+        notificationInterface.close();
+        try {
+            if (executors != null) {
+                stopConsumerThreads();
+                executors.shutdownNow();
+                if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                    LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+                }
+                executors = null;
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Failure in shutting down consumers");
+        }
+    }
+
+    private void stopConsumerThreads() {
+        if (consumers != null) {
+            for (HookConsumer consumer : consumers) {
+                consumer.stop();
+            }
+            consumers.clear();
+        }
+    }
+
+    /**
+     * Start Kafka consumer threads that read from Kafka topic when server is activated.
+     *
+     * Since the consumers create / update entities to the shared backend store, only the active instance
+     * should perform this activity. Hence, these threads are started only on server activation.
+     */
+    @Override
+    public void instanceIsActive() {
+        LOG.info("Reacting to active state: initializing Kafka consumers");
+        startConsumers(executors);
+    }
+
+    /**
+     * Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
+     *
+     * Since the consumers create / update entities to the shared backend store, only the active instance
+     * should perform this activity. Hence, these threads are stopped only on server deactivation.
+     */
+    @Override
+    public void instanceIsPassive() {
+        LOG.info("Reacting to passive state: shutting down Kafka consumers.");
+        stop();
+    }
+
+    static class Timer {
+        public void sleep(int interval) throws InterruptedException {
+            Thread.sleep(interval);
+        }
+    }
+
+    class HookConsumer implements Runnable {
+        private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
+        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
+
+        public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+            this.consumer = consumer;
+        }
+
+        private boolean hasNext() {
+            try {
+                return consumer.hasNext();
+            } catch (ConsumerTimeoutException e) {
+                return false;
+            }
+        }
+
+        @Override
+        public void run() {
+            shouldRun.set(true);
+
+            if (!serverAvailable(new NotificationHookConsumer.Timer())) {
+                return;
+            }
+
+            while (shouldRun.get()) {
+                try {
+                    if (hasNext()) {
+                        HookNotification.HookNotificationMessage message = consumer.next();
+                        atlasClient.setUser(message.getUser());
+                        try {
+                            switch (message.getType()) {
+                            case ENTITY_CREATE:
+                                HookNotification.EntityCreateRequest createRequest =
+                                        (HookNotification.EntityCreateRequest) message;
+                                atlasClient.createEntity(createRequest.getEntities());
+                                break;
+
+                            case ENTITY_PARTIAL_UPDATE:
+                                HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
+                                        (HookNotification.EntityPartialUpdateRequest) message;
+                                atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+                                        partialUpdateRequest.getAttribute(),
+                                        partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
+                                break;
+
+                            case ENTITY_DELETE:
+                                HookNotification.EntityDeleteRequest deleteRequest =
+                                    (HookNotification.EntityDeleteRequest) message;
+                                atlasClient.deleteEntity(deleteRequest.getTypeName(),
+                                    deleteRequest.getAttribute(),
+                                    deleteRequest.getAttributeValue());
+                                break;
+
+                            case ENTITY_FULL_UPDATE:
+                                HookNotification.EntityUpdateRequest updateRequest =
+                                        (HookNotification.EntityUpdateRequest) message;
+                                atlasClient.updateEntities(updateRequest.getEntities());
+                                break;
+
+                            default:
+                                throw new IllegalStateException("Unhandled exception!");
+                            }
+                        } catch (Exception e) {
+                            //todo handle failures
+                            LOG.warn("Error handling message {}", message, e);
+                        }
+                    }
+                } catch (Throwable t) {
+                    LOG.warn("Failure in NotificationHookConsumer", t);
+                }
+            }
+        }
+
+        boolean serverAvailable(Timer timer) {
+            try {
+                while (!atlasClient.isServerReady()) {
+                    try {
+                        LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
+                                SERVER_READY_WAIT_TIME_MS);
+                        timer.sleep(SERVER_READY_WAIT_TIME_MS);
+                    } catch (InterruptedException e) {
+                        LOG.info("Interrupted while waiting for Atlas Server to become ready, "
+                                + "exiting consumer thread.", e);
+                        return false;
+                    }
+                }
+            } catch (Throwable e) {
+                LOG.info(
+                        "Handled AtlasServiceException while waiting for Atlas Server to become ready, "
+                                + "exiting consumer thread.", e);
+                return false;
+            }
+            LOG.info("Atlas Server is ready, can start reading Kafka events.");
+            return true;
+        }
+
+        public void stop() {
+            shouldRun.set(false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
index 01b1cd3..2d84b10 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
@@ -25,7 +25,6 @@ import org.apache.atlas.security.SecurityProperties;
 import org.apache.atlas.web.util.Servlets;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
@@ -57,10 +56,6 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasAuthenticationFilter.class);
     static final String PREFIX = "atlas.http.authentication";
 
-    /**
-     * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
-     * before invoking the actual resource.
-     */
     private HttpServlet optionsServlet;
 
     /**
@@ -128,47 +123,45 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
     @Override
     public void doFilter(final ServletRequest request, final ServletResponse response,
                          final FilterChain filterChain) throws IOException, ServletException {
-
         FilterChain filterChainWrapper = new FilterChain() {
-
             @Override
             public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
                     throws IOException, ServletException {
-                HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
+                final HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
 
-                if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
+                if (httpRequest.getMethod().equals("OPTIONS")) {
                     optionsServlet.service(request, response);
+
                 } else {
-                    final String user = Servlets.getUserFromRequest(httpRequest);
-                    if (StringUtils.isEmpty(user)) {
-                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
-                                "Param user.name can't be empty");
-                    } else {
-                        try {
-                            NDC.push(user + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
-                            RequestContext requestContext = RequestContext.get();
-                            requestContext.setUser(user);
-                            LOG.info("Request from authenticated user: {}, URL={}", user,
-                                    Servlets.getRequestURI(httpRequest));
-
-                            filterChain.doFilter(servletRequest, servletResponse);
-                        } finally {
-                            NDC.pop();
-                        }
+                    try {
+                        String requestUser = httpRequest.getRemoteUser();
+                        NDC.push(requestUser + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
+                        RequestContext requestContext = RequestContext.get();
+                        requestContext.setUser(requestUser);
+                        LOG.info("Request from authenticated user: {}, URL={}", requestUser,
+                                Servlets.getRequestURI(httpRequest));
+
+                        filterChain.doFilter(servletRequest, servletResponse);
+                    } finally {
+                        NDC.pop();
                     }
                 }
             }
         };
 
-        super.doFilter(request, response, filterChainWrapper);
+        try {
+            super.doFilter(request, response, filterChainWrapper);
+        } catch (NullPointerException e) {
+            //PseudoAuthenticationHandler.getUserName() from hadoop-auth throws NPE if user name is not specified
+            ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+                    "Authentication is enabled and user is not specified. Specify user.name parameter");
+        }
     }
 
+
     @Override
     public void destroy() {
-        if (optionsServlet != null) {
-            optionsServlet.destroy();
-        }
-
+        optionsServlet.destroy();
         super.destroy();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index 9d60e1a..eeaddd6 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -99,7 +99,7 @@ public class AuditFilter implements Filter {
         return userFromRequest == null ? "UNKNOWN" : userFromRequest;
     }
 
-    private void audit(String who, String fromAddress, String whatRequest, String fromHost, String whatURL, String whatAddrs,
+    public static void audit(String who, String fromAddress, String whatRequest, String fromHost, String whatURL, String whatAddrs,
             String whenISO9601) {
         AUDIT_LOG.info("Audit: {}/{}-{} performed request {} {} ({}) at time {}", who, fromAddress, fromHost, whatRequest, whatURL,
                 whatAddrs, whenISO9601);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index 1eca174..010fa2a 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -58,7 +58,6 @@ public class GuiceServletConfig extends GuiceServletContextListener {
     private static final Logger LOG = LoggerFactory.getLogger(GuiceServletConfig.class);
 
     private static final String GUICE_CTX_PARAM = "guice.packages";
-    static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
     protected volatile Injector injector;
 
     @Override
@@ -126,7 +125,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
                             if (configuration == null) {
                                 throw new ConfigurationException("Could not load application configuration");
                             }
-                            if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) {
+                            if (Boolean.valueOf(configuration.getString(AtlasClient.HTTP_AUTHENTICATION_ENABLED))) {
                                 LOG.info("Enabling AuthenticationFilter");
                                 filter("/*").through(AtlasAuthenticationFilter.class);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 6068007..36b7607 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.web.resources;
 
 import com.google.inject.Inject;
+import org.apache.atlas.AtlasClient;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.Servlets;
 import org.apache.commons.configuration.ConfigurationException;
@@ -113,7 +114,7 @@ public class AdminResource {
     public Response getStatus() {
         JSONObject responseData = new JSONObject();
         try {
-            responseData.put("Status", serviceState.getState().toString());
+            responseData.put(AtlasClient.STATUS, serviceState.getState().toString());
             Response response = Response.ok(responseData).build();
             return response;
         } catch (JSONException e) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
index b14aa80..709fec5 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
@@ -330,7 +330,6 @@ public class EntityResource {
      * @param entityType the entity type
      * @param attribute the unique attribute used to identify the entity
      * @param value the unique attribute value used to identify the entity
-     * @param request - Ignored
      * @return response payload as json - including guids of entities(including composite references from that entity) that were deleted
      */
     @DELETE
@@ -338,8 +337,7 @@ public class EntityResource {
     public Response deleteEntities(@QueryParam("guid") List<String> guids,
         @QueryParam("type") String entityType,
         @QueryParam("property") String attribute,
-        @QueryParam("value") String value,
-        @Context HttpServletRequest request) {
+        @QueryParam("value") String value) {
         
         try {
             List<String> deletedGuids = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java b/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java
index 480a232..b4f0839 100755
--- a/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java
+++ b/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java
@@ -19,10 +19,13 @@
 package org.apache.atlas.web.util;
 
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.LocalServletRequest;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
@@ -34,6 +37,8 @@ import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.List;
 
 /**
  * Utility functions for dealing with servlets.
@@ -70,6 +75,28 @@ public final class Servlets {
             return user;
         }
 
+        user = getDoAsUser(httpRequest);
+        if (!StringUtils.isEmpty(user)) {
+            return user;
+        }
+
+        return null;
+    }
+
+    private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+    private static final String DO_AS = "doAs";
+
+    public static String getDoAsUser(HttpServletRequest request) {
+        if (StringUtils.isNoneEmpty(request.getQueryString())) {
+            List<NameValuePair> list = URLEncodedUtils.parse(request.getQueryString(), UTF8_CHARSET);
+            if (list != null) {
+                for (NameValuePair nv : list) {
+                    if (DO_AS.equals(nv.getName())) {
+                        return nv.getValue();
+                    }
+                }
+            }
+        }
         return null;
     }
 
@@ -134,6 +161,11 @@ public final class Servlets {
     }
 
     public static String getRequestPayload(HttpServletRequest request) throws IOException {
+        //request is an instance of LocalServletRequest for calls from LocalAtlasClient
+        if (request instanceof LocalServletRequest) {
+            return ((LocalServletRequest) request).getPayload();
+        }
+
         StringWriter writer = new StringWriter();
         IOUtils.copy(request.getInputStream(), writer);
         return writer.toString();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java b/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java
new file mode 100644
index 0000000..7f20652
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.web.resources.EntityResource;
+import org.apache.atlas.web.service.ServiceState;
+import org.apache.commons.lang.RandomStringUtils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class LocalAtlasClientTest {
+    @Mock
+    private EntityResource entityResource;
+
+    @Mock
+    private ServiceState serviceState;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testCreateEntity() throws Exception {
+        Response response = mock(Response.class);
+        when(entityResource.submit(any(HttpServletRequest.class))).thenReturn(response);
+        final String guid = random();
+        when(response.getEntity()).thenReturn(new JSONObject() {{
+            put(AtlasClient.GUID, new JSONArray(Arrays.asList(guid)));
+        }});
+
+        LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+        List<String> results = atlasClient.createEntity(new Referenceable(random()));
+        assertEquals(results.size(), 1);
+        assertEquals(results.get(0), guid);
+    }
+
+    @Test
+    public void testException() throws Exception {
+        LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+
+        Response response = mock(Response.class);
+        when(entityResource.submit(any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response));
+        when(response.getEntity()).thenReturn(new JSONObject() {{
+            put("stackTrace", "stackTrace");
+        }});
+        when(response.getStatus()).thenReturn(Response.Status.BAD_REQUEST.getStatusCode());
+        try {
+            atlasClient.createEntity(new Referenceable(random()));
+            fail("Expected AtlasServiceException");
+        } catch(AtlasServiceException e) {
+            assertEquals(e.getStatus(), ClientResponse.Status.BAD_REQUEST);
+        }
+
+        when(entityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(),
+                any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response));
+        when(response.getStatus()).thenReturn(Response.Status.NOT_FOUND.getStatusCode());
+        try {
+            atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
+            fail("Expected AtlasServiceException");
+        } catch(AtlasServiceException e) {
+            assertEquals(e.getStatus(), ClientResponse.Status.NOT_FOUND);
+        }
+
+    }
+
+    @Test
+    public void testIsServerReady() throws Exception {
+        when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
+        LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+        assertTrue(atlasClient.isServerReady());
+
+        when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE);
+        assertFalse(atlasClient.isServerReady());
+    }
+
+    @Test
+    public void testUpdateEntity() throws Exception {
+        final String guid = random();
+        Response response = mock(Response.class);
+        when(entityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(),
+                any(HttpServletRequest.class))).thenReturn(response);
+        when(response.getEntity()).thenReturn(new JSONObject() {{
+            put(AtlasClient.GUID, guid);
+        }});
+
+        LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+        String actualId = atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
+        assertEquals(actualId, guid);
+    }
+
+    @Test
+    public void testDeleteEntity() throws Exception {
+        final String guid = random();
+        Response response = mock(Response.class);
+        when(response.getEntity()).thenReturn(new JSONObject() {{
+            put(AtlasClient.GUID, new JSONArray(Arrays.asList(guid)));
+        }});
+
+        when(entityResource.deleteEntities(anyListOf(String.class), anyString(), anyString(), anyString())).thenReturn(response);
+        LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+        List<String> results = atlasClient.deleteEntity(random(), random(), random());
+        assertEquals(results.size(), 1);
+        assertEquals(results.get(0), guid);
+    }
+
+    private String random() {
+        return RandomStringUtils.randomAlphanumeric(10);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index 03a0d3f..72f403e 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.notification;
 
 import com.google.inject.Inject;
+import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.persistence.Id;
@@ -29,6 +30,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import java.util.List;
+
 import static org.testng.Assert.assertEquals;
 
 @Guice(modules = NotificationModule.class)
@@ -55,6 +58,28 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
     }
 
     @Test
+    public void testMessageHandleFailureConsumerContinues() throws Exception {
+        //send invalid message - update with invalid type
+        sendHookMessage(new HookNotification.EntityPartialUpdateRequest(TEST_USER, randomString(), null, null,
+                new Referenceable(randomString())));
+
+        //send valid message
+        final Referenceable entity = new Referenceable(DATABASE_TYPE);
+        entity.set("name", "db" + randomString());
+        entity.set("description", randomString());
+        sendHookMessage(new HookNotification.EntityCreateRequest(TEST_USER, entity));
+
+        waitFor(MAX_WAIT_TIME, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
+                        entity.get("name")));
+                return results.length() == 1;
+            }
+        });
+    }
+
+    @Test
     public void testCreateEntity() throws Exception {
         final Referenceable entity = new Referenceable(DATABASE_TYPE);
         entity.set("name", "db" + randomString());
@@ -70,6 +95,13 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
                 return results.length() == 1;
             }
         });
+
+        //Assert that user passed in hook message is used in audit
+        Referenceable instance = serviceClient.getEntity(DATABASE_TYPE, "name", (String) entity.get("name"));
+        List<EntityAuditEvent> events =
+                serviceClient.getEntityAuditEvents(instance.getId()._getId(), (short) 1);
+        assertEquals(events.size(), 1);
+        assertEquals(events.get(0).getUser(), TEST_USER);
     }
 
     @Test
@@ -132,7 +164,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         final String dbName = "db" + randomString();
         entity.set("name", dbName);
         entity.set("description", randomString());
-        final String dbId = serviceClient.createEntity(entity).getString(0);
+        final String dbId = serviceClient.createEntity(entity).get(0);
 
         sendHookMessage(
             new HookNotification.EntityDeleteRequest(TEST_USER, DATABASE_TYPE, "name", dbName));

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
new file mode 100644
index 0000000..8765826
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.LocalAtlasClient;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class NotificationHookConsumerTest {
+
+    @Mock
+    private NotificationInterface notificationInterface;
+
+    @Mock
+    private LocalAtlasClient atlasClient;
+
+    @Mock
+    private Configuration configuration;
+
+    @Mock
+    private ExecutorService executorService;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        when(atlasClient.isServerReady()).thenReturn(true);
+
+        assertTrue(hookConsumer.serverAvailable(timer));
+
+        verifyZeroInteractions(timer);
+    }
+
+    @Test
+    public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
+
+        assertTrue(hookConsumer.serverAvailable(timer));
+
+        verify(timer, times(3)).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
+    }
+
+    @Test
+    public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
+        when(atlasClient.isServerReady()).thenReturn(false);
+
+        assertFalse(hookConsumer.serverAvailable(timer));
+    }
+
+    @Test
+    public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
+                new Exception()));
+
+        assertFalse(hookConsumer.serverAvailable(timer));
+    }
+
+    @Test
+    public void testConsumersStartedIfHAIsDisabled() {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
+        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        consumers.add(mock(NotificationConsumer.class));
+        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+                thenReturn(consumers);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+        notificationHookConsumer.startInternal(configuration, executorService);
+        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+        verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+    }
+
+    @Test
+    public void testConsumersAreNotStartedIfHAIsEnabled() {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        consumers.add(mock(NotificationConsumer.class));
+        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+                thenReturn(consumers);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+        notificationHookConsumer.startInternal(configuration, executorService);
+        verifyZeroInteractions(notificationInterface);
+    }
+
+    @Test
+    public void testConsumersAreStartedWhenInstanceBecomesActive() {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        consumers.add(mock(NotificationConsumer.class));
+        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+                thenReturn(consumers);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+        notificationHookConsumer.startInternal(configuration, executorService);
+        notificationHookConsumer.instanceIsActive();
+        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+        verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+    }
+
+    @Test
+    public void testConsumersAreStoppedWhenInstanceBecomesPassive() {
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        consumers.add(mock(NotificationConsumer.class));
+        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+                thenReturn(consumers);
+        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+        notificationHookConsumer.startInternal(configuration, executorService);
+        notificationHookConsumer.instanceIsPassive();
+        verify(notificationInterface).close();
+        verify(executorService).shutdownNow();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java
deleted file mode 100644
index 9e1e08f..0000000
--- a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.web.filters;
-
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.web.security.BaseSecurityTest;
-import org.apache.atlas.web.service.EmbeddedServer;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hdfs.web.URLConnectionFactory;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.webapp.WebAppContext;
-import org.testng.annotations.Test;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-import java.io.File;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
-import java.util.Properties;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- *
- */
-public class AtlasAuthenticationKerberosFilterIT extends BaseSecurityTest {
-    public static final String TEST_USER_JAAS_SECTION = "TestUser";
-    public static final String TESTUSER = "testuser";
-    public static final String TESTPASS = "testpass";
-
-    private File userKeytabFile;
-    private File httpKeytabFile;
-
-    class TestEmbeddedServer extends EmbeddedServer {
-        public TestEmbeddedServer(int port, String path) throws IOException {
-            super(port, path);
-        }
-
-        Server getServer() {
-            return server;
-        }
-
-        @Override
-        protected WebAppContext getWebAppContext(String path) {
-            WebAppContext application = new WebAppContext(path, "/");
-            application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
-            application.setClassLoader(Thread.currentThread().getContextClassLoader());
-            return application;
-        }
-    }
-
-    @Test(enabled = false)
-    public void testKerberosBasedLogin() throws Exception {
-        String originalConf = System.getProperty("atlas.conf");
-        System.setProperty("atlas.conf", System.getProperty("user.dir"));
-
-        setupKDCAndPrincipals();
-        TestEmbeddedServer server = null;
-
-        try {
-            // setup the atlas-application.properties file
-            generateKerberosTestProperties();
-
-            // need to create the web application programmatically in order to control the injection of the test
-            // application properties
-            server = new TestEmbeddedServer(23000, "webapp/target/apache-atlas");
-
-            startEmbeddedServer(server.getServer());
-
-            final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
-            // attempt to hit server and get rejected
-            URL url = new URL("http://localhost:23000/");
-            HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, false);
-            connection.setRequestMethod("GET");
-            connection.connect();
-
-            assertEquals(connection.getResponseCode(), 401);
-
-            // need to populate the ticket cache with a local user, so logging in...
-            Subject subject = loginTestUser();
-
-            Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
-                @Override
-                public Object run() throws Exception {
-                    // attempt to hit server and get rejected
-                    URL url = new URL("http://localhost:23000/");
-                    HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, true);
-                    connection.setRequestMethod("GET");
-                    connection.connect();
-
-                    assertEquals(connection.getResponseCode(), 200);
-                    assertEquals(RequestContext.get().getUser(), TESTUSER);
-                    return null;
-                }
-            });
-        } finally {
-            server.getServer().stop();
-            kdc.stop();
-
-            if (originalConf != null) {
-                System.setProperty("atlas.conf", originalConf);
-            } else {
-                System.clearProperty("atlas.conf");
-            }
-
-        }
-
-
-    }
-
-    protected Subject loginTestUser() throws LoginException, IOException {
-        LoginContext lc = new LoginContext(TEST_USER_JAAS_SECTION, new CallbackHandler() {
-
-            @Override
-            public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
-                for (int i = 0; i < callbacks.length; i++) {
-                    if (callbacks[i] instanceof PasswordCallback) {
-                        PasswordCallback passwordCallback = (PasswordCallback) callbacks[i];
-                        passwordCallback.setPassword(TESTPASS.toCharArray());
-                    }
-                    if (callbacks[i] instanceof NameCallback) {
-                        NameCallback nameCallback = (NameCallback) callbacks[i];
-                        nameCallback.setName(TESTUSER);
-                    }
-                }
-            }
-        });
-        // attempt authentication
-        lc.login();
-        return lc.getSubject();
-    }
-
-    protected void generateKerberosTestProperties() throws IOException, ConfigurationException {
-        Properties props = new Properties();
-        props.setProperty("atlas.http.authentication.enabled", "true");
-        props.setProperty("atlas.http.authentication.type", "kerberos");
-        props.setProperty("atlas.http.authentication.kerberos.principal", "HTTP/localhost@" + kdc.getRealm());
-        props.setProperty("atlas.http.authentication.kerberos.keytab", httpKeytabFile.getAbsolutePath());
-        props.setProperty("atlas.http.authentication.kerberos.name.rules",
-                "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT");
-
-        generateTestProperties(props);
-    }
-
-    public void setupKDCAndPrincipals() throws Exception {
-        // set up the KDC
-        File kdcWorkDir = startKDC();
-
-        userKeytabFile = createKeytab(kdc, kdcWorkDir, "dgi", "dgi.keytab");
-        httpKeytabFile = createKeytab(kdc, kdcWorkDir, "HTTP", "spnego.service.keytab");
-
-        // create a test user principal
-        kdc.createPrincipal(TESTUSER, TESTPASS);
-
-        StringBuilder jaas = new StringBuilder(1024);
-        jaas.append("TestUser {\n" +
-                "    com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" +
-                "};\n");
-        jaas.append(createJAASEntry("Client", "dgi", userKeytabFile));
-        jaas.append(createJAASEntry("Server", "HTTP", httpKeytabFile));
-
-        File jaasFile = new File(kdcWorkDir, "jaas.txt");
-        FileUtils.write(jaasFile, jaas.toString());
-        bindJVMtoJAASFile(jaasFile);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterTest.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterTest.java
new file mode 100644
index 0000000..f85892a
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.filters;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.web.security.BaseSecurityTest;
+import org.apache.atlas.web.service.EmbeddedServer;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.testng.annotations.Test;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ *
+ */
+public class AtlasAuthenticationKerberosFilterTest extends BaseSecurityTest {
+    public static final String TEST_USER_JAAS_SECTION = "TestUser";
+    public static final String TESTUSER = "testuser";
+    public static final String TESTPASS = "testpass";
+
+    private File userKeytabFile;
+    private File httpKeytabFile;
+
+    class TestEmbeddedServer extends EmbeddedServer {
+        public TestEmbeddedServer(int port, String path) throws IOException {
+            super(port, path);
+        }
+
+        Server getServer() {
+            return server;
+        }
+
+        @Override
+        protected WebAppContext getWebAppContext(String path) {
+            WebAppContext application = new WebAppContext(path, "/");
+            application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
+            application.setClassLoader(Thread.currentThread().getContextClassLoader());
+            return application;
+        }
+    }
+
+    @Test(enabled = false)
+    public void testKerberosBasedLogin() throws Exception {
+        String originalConf = System.getProperty("atlas.conf");
+
+        setupKDCAndPrincipals();
+        TestEmbeddedServer server = null;
+
+        try {
+            // setup the atlas-application.properties file
+            String confDirectory = generateKerberosTestProperties();
+            System.setProperty("atlas.conf", confDirectory);
+
+            // need to create the web application programmatically in order to control the injection of the test
+            // application properties
+            server = new TestEmbeddedServer(23000, "webapp/target/apache-atlas");
+
+            startEmbeddedServer(server.getServer());
+
+            final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
+            // attempt to hit server and get rejected
+            URL url = new URL("http://localhost:23000/");
+            HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, false);
+            connection.setRequestMethod("GET");
+            connection.connect();
+
+            assertEquals(connection.getResponseCode(), 401);
+
+            // need to populate the ticket cache with a local user, so logging in...
+            Subject subject = loginTestUser();
+
+            Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
+                @Override
+                public Object run() throws Exception {
+                    // attempt to hit server and get rejected
+                    URL url = new URL("http://localhost:23000/");
+                    HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, true);
+                    connection.setRequestMethod("GET");
+                    connection.connect();
+
+                    assertEquals(connection.getResponseCode(), 200);
+                    assertEquals(RequestContext.get().getUser(), TESTUSER);
+                    return null;
+                }
+            });
+        } finally {
+            server.getServer().stop();
+            kdc.stop();
+
+            if (originalConf != null) {
+                System.setProperty("atlas.conf", originalConf);
+            } else {
+                System.clearProperty("atlas.conf");
+            }
+
+        }
+    }
+
+    protected Subject loginTestUser() throws LoginException, IOException {
+        LoginContext lc = new LoginContext(TEST_USER_JAAS_SECTION, new CallbackHandler() {
+
+            @Override
+            public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+                for (int i = 0; i < callbacks.length; i++) {
+                    if (callbacks[i] instanceof PasswordCallback) {
+                        PasswordCallback passwordCallback = (PasswordCallback) callbacks[i];
+                        passwordCallback.setPassword(TESTPASS.toCharArray());
+                    }
+                    if (callbacks[i] instanceof NameCallback) {
+                        NameCallback nameCallback = (NameCallback) callbacks[i];
+                        nameCallback.setName(TESTUSER);
+                    }
+                }
+            }
+        });
+        // attempt authentication
+        lc.login();
+        return lc.getSubject();
+    }
+
+    protected String generateKerberosTestProperties() throws Exception {
+        PropertiesConfiguration props = new PropertiesConfiguration();
+        props.setProperty("atlas.http.authentication.enabled", "true");
+        props.setProperty("atlas.http.authentication.type", "kerberos");
+        props.setProperty("atlas.http.authentication.kerberos.principal", "HTTP/localhost@" + kdc.getRealm());
+        props.setProperty("atlas.http.authentication.kerberos.keytab", httpKeytabFile.getAbsolutePath());
+        props.setProperty("atlas.http.authentication.kerberos.name.rules",
+                "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT");
+
+        return writeConfiguration(props);
+    }
+
+    public void setupKDCAndPrincipals() throws Exception {
+        // set up the KDC
+        File kdcWorkDir = startKDC();
+
+        userKeytabFile = createKeytab(kdc, kdcWorkDir, "dgi", "dgi.keytab");
+        httpKeytabFile = createKeytab(kdc, kdcWorkDir, "HTTP", "spnego.service.keytab");
+
+        // create a test user principal
+        kdc.createPrincipal(TESTUSER, TESTPASS);
+
+        StringBuilder jaas = new StringBuilder(1024);
+        jaas.append("TestUser {\n" +
+                "    com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" +
+                "};\n");
+        jaas.append(createJAASEntry("Client", "dgi", userKeytabFile));
+        jaas.append(createJAASEntry("Server", "HTTP", httpKeytabFile));
+
+        File jaasFile = new File(kdcWorkDir, "jaas.txt");
+        FileUtils.write(jaasFile, jaas.toString());
+        bindJVMtoJAASFile(jaasFile);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java
deleted file mode 100644
index ca53096..0000000
--- a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.web.filters;
-
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.web.security.BaseSecurityTest;
-import org.apache.atlas.web.service.EmbeddedServer;
-import org.apache.commons.configuration.ConfigurationException;
-import org.eclipse.jetty.server.Server;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Properties;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- *
- */
-public class AtlasAuthenticationSimpleFilterIT extends BaseSecurityTest {
-    public static final String TESTUSER = "testuser";
-
-    class TestEmbeddedServer extends EmbeddedServer {
-        public TestEmbeddedServer(int port, String path) throws IOException {
-            super(port, path);
-        }
-
-        Server getServer() {
-            return server;
-        }
-    }
-
-    @Test(enabled = false)
-    public void testSimpleLogin() throws Exception {
-        String originalConf = System.getProperty("atlas.conf");
-        System.setProperty("atlas.conf", System.getProperty("user.dir"));
-        generateSimpleLoginConfiguration();
-
-        TestEmbeddedServer server = new TestEmbeddedServer(23001, "webapp/target/apache-atlas");
-
-        try {
-            startEmbeddedServer(server.getServer());
-
-            URL url = new URL("http://localhost:23001");
-            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
-            connection.setRequestMethod("GET");
-            connection.connect();
-
-            try {
-                assertEquals(connection.getResponseCode(), 403);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-
-            url = new URL("http://localhost:23001/?user.name=testuser");
-            connection = (HttpURLConnection) url.openConnection();
-            connection.setRequestMethod("GET");
-            connection.connect();
-
-            assertEquals(connection.getResponseCode(), 200);
-            assertEquals(RequestContext.get().getUser(), TESTUSER);
-        } finally {
-            server.getServer().stop();
-            if (originalConf != null) {
-                System.setProperty("atlas.conf", originalConf);
-            } else {
-                System.clearProperty("atlas.conf");
-            }
-        }
-
-
-    }
-
-    protected void generateSimpleLoginConfiguration() throws IOException, ConfigurationException {
-        Properties config = new Properties();
-        config.setProperty("atlas.http.authentication.enabled", "true");
-        config.setProperty("atlas.http.authentication.type", "simple");
-
-        generateTestProperties(config);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterTest.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterTest.java
new file mode 100644
index 0000000..389eefe
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.filters;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.web.security.BaseSecurityTest;
+import org.apache.atlas.web.service.EmbeddedServer;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.eclipse.jetty.server.Server;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ *
+ */
+public class AtlasAuthenticationSimpleFilterTest extends BaseSecurityTest {
+    public static final String TESTUSER = "testuser";
+
+    class TestEmbeddedServer extends EmbeddedServer {
+        public TestEmbeddedServer(int port, String path) throws IOException {
+            super(port, path);
+        }
+
+        Server getServer() {
+            return server;
+        }
+    }
+
+    @Test(enabled = false)
+    public void testSimpleLogin() throws Exception {
+        String originalConf = System.getProperty("atlas.conf");
+        System.setProperty("atlas.conf", System.getProperty("user.dir"));
+        generateSimpleLoginConfiguration();
+
+        TestEmbeddedServer server = new TestEmbeddedServer(23001, "webapp/target/apache-atlas");
+
+        try {
+            startEmbeddedServer(server.getServer());
+
+            URL url = new URL("http://localhost:23001");
+            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+            connection.setRequestMethod("GET");
+            connection.connect();
+            assertEquals(connection.getResponseCode(), Response.Status.BAD_REQUEST.getStatusCode());
+
+            url = new URL("http://localhost:23001/?user.name=testuser");
+            connection = (HttpURLConnection) url.openConnection();
+            connection.setRequestMethod("GET");
+            connection.connect();
+
+            assertEquals(connection.getResponseCode(), Response.Status.OK.getStatusCode());
+            assertEquals(RequestContext.get().getUser(), TESTUSER);
+        } finally {
+            server.getServer().stop();
+            if (originalConf != null) {
+                System.setProperty("atlas.conf", originalConf);
+            } else {
+                System.clearProperty("atlas.conf");
+            }
+        }
+    }
+
+    protected String generateSimpleLoginConfiguration() throws Exception {
+        PropertiesConfiguration config = new PropertiesConfiguration();
+        config.setProperty("atlas.http.authentication.enabled", "true");
+        config.setProperty("atlas.http.authentication.type", "simple");
+        return writeConfiguration(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
index 54d8d92..ab3aa23 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
@@ -25,9 +25,7 @@ import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
-
 import kafka.consumer.ConsumerTimeoutException;
-
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasServiceException;
@@ -55,7 +53,6 @@ import org.apache.atlas.utils.ParamChecker;
 import org.apache.atlas.web.util.Servlets;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
-import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,7 +62,6 @@ import org.testng.annotations.BeforeClass;
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
-
 import java.util.List;
 
 /**
@@ -78,7 +74,8 @@ public abstract class BaseResourceIT {
     protected WebResource service;
     protected AtlasClient serviceClient;
     public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class);
-    protected static final int MAX_WAIT_TIME = 1000;
+    protected static final int MAX_WAIT_TIME = 60000;
+    protected String baseUrl;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -86,7 +83,7 @@ public abstract class BaseResourceIT {
         DefaultClientConfig config = new DefaultClientConfig();
         Client client = Client.create(config);
         Configuration configuration = ApplicationProperties.get();
-        String baseUrl = configuration.getString(ATLAS_REST_ADDRESS, "http://localhost:21000/");
+        baseUrl = configuration.getString(ATLAS_REST_ADDRESS, "http://localhost:21000/");
         client.resource(UriBuilder.fromUri(baseUrl).build());
 
         service = client.resource(UriBuilder.fromUri(baseUrl).build());
@@ -126,12 +123,12 @@ public abstract class BaseResourceIT {
 
         String entityJSON = InstanceSerialization.toJson(referenceable, true);
         System.out.println("Submitting new entity= " + entityJSON);
-        JSONArray guids = serviceClient.createEntity(entityJSON);
+        List<String> guids = serviceClient.createEntity(entityJSON);
         System.out.println("created instance for type " + typeName + ", guid: " + guids);
 
         // return the reference to created instance with guid
-        if (guids.length() > 0) {
-            return new Id(guids.getString(guids.length() - 1), 0, referenceable.getTypeName());
+        if (guids.size() > 0) {
+            return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
index 720ce79..aa92bc0 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
@@ -25,7 +25,6 @@ import com.google.gson.JsonSyntaxException;
 import com.google.inject.Inject;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
-
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.EntityAuditEvent;
@@ -51,6 +50,7 @@ import org.apache.atlas.typesystem.types.TraitType;
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.web.util.Servlets;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -64,7 +64,6 @@ import org.testng.annotations.Test;
 
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.Response;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -122,6 +121,22 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
     }
 
     @Test
+    public void testRequestUser() throws Exception {
+        Referenceable entity = new Referenceable(DATABASE_TYPE);
+        entity.set("name", randomString());
+        entity.set("description", randomString());
+
+        String user = "testuser";
+        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+        AtlasClient localClient = new AtlasClient(ugi, null, baseUrl);
+        String entityId = localClient.createEntity(entity).get(0);
+
+        List<EntityAuditEvent> events = serviceClient.getEntityAuditEvents(entityId, (short) 10);
+        assertEquals(events.size(), 1);
+        assertEquals(events.get(0).getUser(), user);
+    }
+
+    @Test
     //API should accept single entity (or jsonarray of entities)
     public void testSubmitSingleEntity() throws Exception {
         Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
@@ -149,7 +164,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
         db.set("name", dbName);
         db.set("description", randomString());
 
-        final String dbid = serviceClient.createEntity(db).getString(0);
+        final String dbid = serviceClient.createEntity(db).get(0);
         assertEntityAudit(dbid, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
 
         waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@@ -164,8 +179,8 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
         assertEquals(results.length(), 1);
 
         //create entity again shouldn't create another instance with same unique attribute value
-        results = serviceClient.createEntity(db);
-        assertEquals(results.length(), 0);
+        List<String> entityResults = serviceClient.createEntity(db);
+        assertEquals(entityResults.size(), 0);
         try {
             waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
                 @Override
@@ -214,7 +229,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
         //create entity for the type
         Referenceable instance = new Referenceable(typeDefinition.typeName);
         instance.set("name", randomString());
-        String guid = serviceClient.createEntity(instance).getString(0);
+        String guid = serviceClient.createEntity(instance).get(0);
 
         //update type - add attribute
         typeDefinition = TypesUtil.createClassTypeDef(typeDefinition.typeName, ImmutableSet.<String>of(),

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java b/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
index d497230..3d1a63a 100644
--- a/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
@@ -33,6 +33,7 @@ import java.io.IOException;
  *
  */
 public class BaseSSLAndKerberosTest extends BaseSecurityTest {
+    public static final String TEST_USER_JAAS_SECTION = "TestUser";
     public static final String TESTUSER = "testuser";
     public static final String TESTPASS = "testpass";
     protected static final String DGI_URL = "https://localhost:21443/";
@@ -104,7 +105,7 @@ public class BaseSSLAndKerberosTest extends BaseSecurityTest {
         kdc.createPrincipal(TESTUSER, TESTPASS);
 
         StringBuilder jaas = new StringBuilder(1024);
-        jaas.append("TestUser {\n" +
+        jaas.append(TEST_USER_JAAS_SECTION + " {\n" +
                 "    com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" +
                 "};\n");
         jaas.append(createJAASEntry("Client", "dgi", userKeytabFile));

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
index 270a20d..54c570c 100644
--- a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
@@ -140,10 +140,6 @@ public class BaseSecurityTest {
     }
 
     public static String writeConfiguration(final PropertiesConfiguration configuration) throws Exception {
-        String persistDir = TestUtils.getTempDirectory();
-        TestUtils.writeConfiguration(configuration, persistDir + File.separator +
-                ApplicationProperties.APPLICATION_PROPERTIES);
-
         String confLocation = System.getProperty("atlas.conf");
         URL url;
         if (confLocation == null) {
@@ -153,6 +149,10 @@ public class BaseSecurityTest {
         }
         PropertiesConfiguration configuredProperties = new PropertiesConfiguration();
         configuredProperties.load(url);
+
+        configuredProperties.copy(configuration);
+
+        String persistDir = TestUtils.getTempDirectory();
         TestUtils.writeConfiguration(configuredProperties, persistDir + File.separator +
                 ApplicationProperties.APPLICATION_PROPERTIES);
         ApplicationProperties.forceReload();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java b/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
index 521c037..8afcc26 100755
--- a/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
@@ -19,7 +19,6 @@
 package org.apache.atlas.web.security;
 
 import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.web.service.SecureEmbeddedServer;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.hadoop.conf.Configuration;
@@ -42,7 +41,7 @@ import static org.apache.atlas.security.SecurityProperties.SERVER_CERT_PASSWORD_
 import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY;
 
 public class SSLTest extends BaseSSLAndKerberosTest {
-    private AtlasClient dgiCLient;
+    private AtlasClient atlasClient;
     private Path jksPath;
     private String providerUrl;
     private TestSecureEmbeddedServer secureEmbeddedServer;
@@ -76,7 +75,7 @@ public class SSLTest extends BaseSSLAndKerberosTest {
         final PropertiesConfiguration configuration = getSSLConfiguration(providerUrl);
         String persistDir = writeConfiguration(configuration);
 
-        dgiCLient = new AtlasClient(DGI_URL) {
+        atlasClient = new AtlasClient(DGI_URL) {
             @Override
             protected PropertiesConfiguration getClientProperties() {
                 return configuration;
@@ -139,6 +138,6 @@ public class SSLTest extends BaseSSLAndKerberosTest {
 
     @Test
     public void testService() throws Exception {
-        dgiCLient.listTypes();
+        atlasClient.listTypes();
    }
 }


[2/2] incubator-atlas git commit: ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)

Posted by sh...@apache.org.
ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/1e3029bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/1e3029bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/1e3029bc

Branch: refs/heads/master
Commit: 1e3029bc7283e233dc816de7d83b28eddd4f4b36
Parents: 334429a
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Fri May 6 12:46:49 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Fri May 6 13:08:48 2016 +0530

----------------------------------------------------------------------
 .gitignore                                      |   6 +-
 addons/falcon-bridge/pom.xml                    |   5 -
 addons/hive-bridge/pom.xml                      |   5 -
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |   8 +-
 .../org/apache/atlas/hive/hook/HiveHookIT.java  |  15 +-
 addons/sqoop-bridge/pom.xml                     |   5 -
 addons/storm-bridge/pom.xml                     |   5 -
 .../java/org/apache/atlas/AtlasAdminClient.java |   2 +-
 .../main/java/org/apache/atlas/AtlasClient.java | 150 ++++---
 .../org/apache/atlas/AtlasServiceException.java |  21 +-
 .../atlas/security/SecureClientUtils.java       |  25 +-
 .../java/org/apache/atlas/AtlasClientTest.java  |   3 +-
 distro/src/conf/atlas-log4j.xml                 |   2 +-
 .../notification/NotificationHookConsumer.java  | 265 ------------
 .../NotificationHookConsumerTest.java           | 183 ---------
 pom.xml                                         |  22 +-
 release-log.txt                                 |   1 +
 .../graph/GraphBackedSearchIndexer.java         |   6 +
 webapp/pom.xml                                  |   1 +
 .../java/org/apache/atlas/LocalAtlasClient.java | 260 ++++++++++++
 .../org/apache/atlas/LocalServletRequest.java   | 400 +++++++++++++++++++
 .../org/apache/atlas/examples/QuickStart.java   |   9 +-
 .../notification/NotificationHookConsumer.java  | 259 ++++++++++++
 .../web/filters/AtlasAuthenticationFilter.java  |  53 ++-
 .../apache/atlas/web/filters/AuditFilter.java   |   2 +-
 .../atlas/web/listeners/GuiceServletConfig.java |   3 +-
 .../atlas/web/resources/AdminResource.java      |   3 +-
 .../atlas/web/resources/EntityResource.java     |   4 +-
 .../org/apache/atlas/web/util/Servlets.java     |  32 ++
 .../org/apache/atlas/LocalAtlasClientTest.java  | 148 +++++++
 .../NotificationHookConsumerIT.java             |  34 +-
 .../NotificationHookConsumerTest.java           | 169 ++++++++
 .../AtlasAuthenticationKerberosFilterIT.java    | 190 ---------
 .../AtlasAuthenticationKerberosFilterTest.java  | 187 +++++++++
 .../AtlasAuthenticationSimpleFilterIT.java      |  98 -----
 .../AtlasAuthenticationSimpleFilterTest.java    |  89 +++++
 .../atlas/web/resources/BaseResourceIT.java     |  15 +-
 .../web/resources/EntityJerseyResourceIT.java   |  27 +-
 .../web/security/BaseSSLAndKerberosTest.java    |   3 +-
 .../atlas/web/security/BaseSecurityTest.java    |   8 +-
 .../org/apache/atlas/web/security/SSLTest.java  |   7 +-
 41 files changed, 1797 insertions(+), 933 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index b6fb8d8..b5a1c74 100755
--- a/.gitignore
+++ b/.gitignore
@@ -46,5 +46,9 @@ test-output
 .DS_Store
 *.swp
 
+#atlas data directory creates when tests are run from IDE
+**/atlas.data/**
+**/${sys:atlas.data}/**
+
 #hbase package downloaded
-distro/hbase/*.tar.gz
\ No newline at end of file
+distro/hbase/*.tar.gz

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/falcon-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index 9b07c9f..14c6090 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -152,11 +152,6 @@
                                 </artifactItem>
                                 <artifactItem>
                                     <groupId>${project.groupId}</groupId>
-                                    <artifactId>atlas-server-api</artifactId>
-                                    <version>${project.version}</version>
-                                </artifactItem>
-                                <artifactItem>
-                                    <groupId>${project.groupId}</groupId>
                                     <artifactId>hdfs-model</artifactId>
                                     <version>${project.version}</version>
                                 </artifactItem>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index e125f18..eeb2aa4 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -239,11 +239,6 @@
                                     <version>${project.version}</version>
                                 </artifactItem>
                                 <artifactItem>
-                                    <groupId>${project.groupId}</groupId>
-                                    <artifactId>atlas-server-api</artifactId>
-                                    <version>${project.version}</version>
-                                </artifactItem>
-                                <artifactItem>
                                     <groupId>org.scala-lang</groupId>
                                     <artifactId>scala-compiler</artifactId>
                                     <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 104c0c5..d4212a1 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -181,10 +180,10 @@ public class HiveMetaStoreBridge {
 
         String entityJSON = InstanceSerialization.toJson(referenceable, true);
         LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON);
-        JSONArray guids = getAtlasClient().createEntity(entityJSON);
+        List<String> guids = getAtlasClient().createEntity(entityJSON);
         LOG.debug("created instance for type " + typeName + ", guid: " + guids);
 
-        return new Referenceable(guids.getString(0), referenceable.getTypeName(), null);
+        return new Referenceable(guids.get(0), referenceable.getTypeName(), null);
     }
 
     /**
@@ -536,8 +535,7 @@ public class HiveMetaStoreBridge {
     public static void main(String[] argv) throws Exception {
         Configuration atlasConf = ApplicationProperties.get();
         String atlasEndpoint = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
-        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-        AtlasClient atlasClient = new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
+        AtlasClient atlasClient = new AtlasClient(atlasEndpoint);
 
         HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient);
         hiveMetaStoreBridge.registerHiveDataModel();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index b0d4c5c..317d636 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -20,7 +20,6 @@ package org.apache.atlas.hive.hook;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
@@ -31,12 +30,7 @@ import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.atlas.typesystem.json.TypesSerialization$;
 import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.typesystem.types.TypeSystem;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.configuration.Configuration;
@@ -51,7 +45,6 @@ import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
@@ -59,9 +52,6 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import java.io.File;
 import java.text.ParseException;
 import java.util.Date;
@@ -737,8 +727,6 @@ public class HiveHookIT {
 
         columns = getColumns(DEFAULT_DB, tableName);
         Assert.assertEquals(columns.size(), 2);
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
-                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
 
         String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
                 HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
@@ -749,6 +737,9 @@ public class HiveHookIT {
             }
         });
 
+        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
+
         //Change name and add comment
         oldColName = "name2";
         newColName = "name3";

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/sqoop-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index 4b5dbb1..343bb4e 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -234,11 +234,6 @@
                                     <version>${project.version}</version>
                                 </artifactItem>
                                 <artifactItem>
-                                    <groupId>${project.groupId}</groupId>
-                                    <artifactId>atlas-server-api</artifactId>
-                                    <version>${project.version}</version>
-                                </artifactItem>
-                                <artifactItem>
                                     <groupId>org.scala-lang</groupId>
                                     <artifactId>scala-compiler</artifactId>
                                     <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/storm-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index d8d98f5..45ec846 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -190,11 +190,6 @@
                                 </artifactItem>
                                 <artifactItem>
                                     <groupId>${project.groupId}</groupId>
-                                    <artifactId>atlas-server-api</artifactId>
-                                    <version>${project.version}</version>
-                                </artifactItem>
-                                <artifactItem>
-                                    <groupId>${project.groupId}</groupId>
                                     <artifactId>hdfs-model</artifactId>
                                     <version>${project.version}</version>
                                 </artifactItem>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java b/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
index 473f72a..d2ae7f0 100644
--- a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
@@ -60,7 +60,7 @@ public class AtlasAdminClient {
         Configuration configuration = ApplicationProperties.get();
         String atlasServerUri = configuration.getString(
                 AtlasConstants.ATLAS_REST_ADDRESS_KEY, AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS);
-        AtlasClient atlasClient = new AtlasClient(atlasServerUri, null, null);
+        AtlasClient atlasClient = new AtlasClient(atlasServerUri);
         return handleCommand(commandLine, atlasServerUri, atlasClient);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index be34802..234af5b 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -32,7 +32,6 @@ import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.TypesDef;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.atlas.typesystem.json.TypesSerialization;
-import org.apache.atlas.typesystem.json.TypesSerialization$;
 import org.apache.atlas.typesystem.types.AttributeDefinition;
 import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
 import org.apache.atlas.typesystem.types.TraitType;
@@ -78,6 +77,7 @@ public class AtlasClient {
     public static final String COUNT = "count";
     public static final String ROWS = "rows";
     public static final String DATATYPE = "dataType";
+    public static final String STATUS = "Status";
 
     public static final String EVENTS = "events";
     public static final String START_KEY = "startKey";
@@ -115,6 +115,9 @@ public class AtlasClient {
     // Setting the default value based on testing failovers while client code like quickstart is running.
     public static final int DEFAULT_NUM_RETRIES = 4;
     public static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+
+    public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
+
     // Setting the default value based on testing failovers while client code like quickstart is running.
     // With number of retries, this gives a total time of about 20s for the server to start.
     public static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
@@ -124,28 +127,20 @@ public class AtlasClient {
     private Configuration configuration;
 
     /**
-     * Create a new AtlasClient.
-     *
-     * @param baseUrl The URL of the Atlas server to connect to.
-     */
-    public AtlasClient(String baseUrl) {
-        this(baseUrl, null, null);
-    }
-
-    /**
-     * Create a new Atlas Client.
-     * @param baseUrl The URL of the Atlas server to connect to.
-     * @param ugi The {@link UserGroupInformation} of logged in user.
-     * @param doAsUser The user on whose behalf queries will be executed.
+     * Create a new Atlas client.
+     * @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
+     *                 High Availability mode. The client will automatically determine the
+     *                 active instance on startup and also when there is a scenario of
+     *                 failover.
      */
-    public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) {
-        initializeState(new String[] {baseUrl}, ugi, doAsUser);
+    public AtlasClient(String... baseUrls) throws AtlasException {
+        this(getCurrentUGI(), baseUrls);
     }
 
     /**
      * Create a new Atlas client.
-     * @param ugi The {@link UserGroupInformation} of logged in user, can be null in unsecure mode.
-     * @param doAsUser The user on whose behalf queries will be executed, can be null in unsecure mode.
+     * @param ugi UserGroupInformation
+     * @param doAsUser
      * @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
      *                 High Availability mode. The client will automatically determine the
      *                 active instance on startup and also when there is a scenario of
@@ -155,6 +150,23 @@ public class AtlasClient {
         initializeState(baseUrls, ugi, doAsUser);
     }
 
+    private static UserGroupInformation getCurrentUGI() throws AtlasException {
+        try {
+            return UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+            throw new AtlasException(e);
+        }
+    }
+
+    private AtlasClient(UserGroupInformation ugi, String[] baseUrls) {
+        this(ugi, ugi.getShortUserName(), baseUrls);
+    }
+
+    //Used by LocalAtlasClient
+    protected AtlasClient() {
+        //Do nothing
+    }
+
     private void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
         configuration = getClientProperties();
         Client client = getClient(configuration, ugi, doAsUser);
@@ -340,7 +352,7 @@ public class AtlasClient {
         WebResource resource = getResource(service, API.STATUS);
         JSONObject response = callAPIWithResource(API.STATUS, resource, null);
         try {
-            result = response.getString("Status");
+            result = response.getString(STATUS);
         } catch (JSONException e) {
             LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e);
         }
@@ -418,12 +430,14 @@ public class AtlasClient {
     public List<String> createType(String typeAsJson) throws AtlasServiceException {
         LOG.debug("Creating type definition: {}", typeAsJson);
         JSONObject response = callAPI(API.CREATE_TYPE, typeAsJson);
-        return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
+        List<String> results = extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
             @Override
             String extractElement(JSONObject element) throws JSONException {
                 return element.getString(AtlasClient.NAME);
             }
         });
+        LOG.debug("Create type definition returned results: {}", results);
+        return results;
     }
 
     /**
@@ -470,14 +484,16 @@ public class AtlasClient {
      * @throws AtlasServiceException
      */
     public List<String> updateType(String typeAsJson) throws AtlasServiceException {
-        LOG.debug("Updating tyep definition: {}", typeAsJson);
+        LOG.debug("Updating type definition: {}", typeAsJson);
         JSONObject response = callAPI(API.UPDATE_TYPE, typeAsJson);
-        return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
+        List<String> results = extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
             @Override
             String extractElement(JSONObject element) throws JSONException {
                 return element.getString(AtlasClient.NAME);
             }
         });
+        LOG.debug("Update type definition returned results: {}", results);
+        return results;
     }
 
     /**
@@ -495,10 +511,11 @@ public class AtlasClient {
         return extractResults(jsonObject, AtlasClient.RESULTS, new ExtractOperation<String, String>());
     }
 
-    public String getType(String typeName) throws AtlasServiceException {
+    public TypesDef getType(String typeName) throws AtlasServiceException {
         try {
             JSONObject response = callAPI(API.GET_TYPE, null, typeName);;
-            return response.getString(DEFINITION);
+            String typeJson = response.getString(DEFINITION);
+            return TypesSerialization.fromJson(typeJson);
         } catch (AtlasServiceException e) {
             if (Response.Status.NOT_FOUND.equals(e.getStatus())) {
                 return null;
@@ -515,14 +532,12 @@ public class AtlasClient {
      * @return json array of guids
      * @throws AtlasServiceException
      */
-    public JSONArray createEntity(JSONArray entities) throws AtlasServiceException {
+    protected List<String> createEntity(JSONArray entities) throws AtlasServiceException {
         LOG.debug("Creating entities: {}", entities);
         JSONObject response = callAPI(API.CREATE_ENTITY, entities.toString());
-        try {
-            return response.getJSONArray(GUID);
-        } catch (JSONException e) {
-            throw new AtlasServiceException(API.GET_ENTITY, e);
-        }
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Create entities returned results: {}", results);
+        return results;
     }
 
     /**
@@ -531,15 +546,15 @@ public class AtlasClient {
      * @return json array of guids
      * @throws AtlasServiceException
      */
-    public JSONArray createEntity(String... entitiesAsJson) throws AtlasServiceException {
+    public List<String> createEntity(String... entitiesAsJson) throws AtlasServiceException {
         return createEntity(new JSONArray(Arrays.asList(entitiesAsJson)));
     }
 
-    public JSONArray createEntity(Referenceable... entities) throws AtlasServiceException {
+    public List<String> createEntity(Referenceable... entities) throws AtlasServiceException {
         return createEntity(Arrays.asList(entities));
     }
 
-    public JSONArray createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
+    public List<String> createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
         JSONArray entityArray = getEntitiesArray(entities);
         return createEntity(entityArray);
     }
@@ -559,19 +574,21 @@ public class AtlasClient {
      * @return json array of guids which were updated/created
      * @throws AtlasServiceException
      */
-    public JSONArray updateEntities(Referenceable... entities) throws AtlasServiceException {
+    public List<String> updateEntities(Referenceable... entities) throws AtlasServiceException {
         return updateEntities(Arrays.asList(entities));
     }
 
-    public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
+    protected List<String> updateEntities(JSONArray entities) throws AtlasServiceException {
+        LOG.debug("Updating entities: {}", entities);
+        JSONObject response = callAPI(API.UPDATE_ENTITY, entities.toString());
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Update entities returned results: {}", results);
+        return results;
+    }
+
+    public List<String> updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
         JSONArray entitiesArray = getEntitiesArray(entities);
-        LOG.debug("Updating entities: {}", entitiesArray);
-        JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString());
-        try {
-            return response.getJSONArray(GUID);
-        } catch (JSONException e) {
-            throw new AtlasServiceException(API.UPDATE_ENTITY, e);
-        }
+        return updateEntities(entitiesArray);
     }
 
     /**
@@ -651,6 +668,8 @@ public class AtlasClient {
                                Referenceable entity) throws AtlasServiceException {
         final API api = API.UPDATE_ENTITY_PARTIAL;
         String entityJson = InstanceSerialization.toJson(entity, true);
+        LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
+                uniqueAttributeName, uniqueAttributeValue, entityJson);
         JSONObject response = callAPIWithRetries(api, entityJson, new ResourceCreator() {
             @Override
             public WebResource createResource() {
@@ -661,10 +680,16 @@ public class AtlasClient {
                 return resource;
             }
         });
+        String result = getString(response, GUID);
+        LOG.debug("Update entity returned result: {}", result);
+        return result;
+    }
+
+    protected String getString(JSONObject jsonObject, String parameter) throws AtlasServiceException {
         try {
-            return response.getString(GUID);
+            return jsonObject.getString(parameter);
         } catch (JSONException e) {
-            throw new AtlasServiceException(api, e);
+            throw new AtlasServiceException(e);
         }
     }
 
@@ -676,6 +701,7 @@ public class AtlasClient {
      * @throws AtlasServiceException
      */
     public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
+        LOG.debug("Deleting entities: {}", guids);
         JSONObject jsonResponse = callAPIWithRetries(API.DELETE_ENTITIES, null, new ResourceCreator() {
             @Override
             public WebResource createResource() {
@@ -687,7 +713,9 @@ public class AtlasClient {
                 return resource;
             }
         });
-        return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+        List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Delete entities returned results: {}", results);
+        return results;
     }
 
     /**
@@ -699,13 +727,17 @@ public class AtlasClient {
      */
     public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue)
             throws AtlasServiceException {
+        LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
+                uniqueAttributeValue);
         API api = API.DELETE_ENTITY;
         WebResource resource = getResource(api);
         resource = resource.queryParam(TYPE, entityType);
         resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
         resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
         JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource, null);
-        return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+        List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Delete entities returned results: {}", results);
+        return results;
     }
 
     /**
@@ -789,13 +821,13 @@ public class AtlasClient {
         return extractResults(jsonResponse, AtlasClient.RESULTS, new ExtractOperation<String, String>());
     }
 
-    private class ExtractOperation<T, U> {
+    protected class ExtractOperation<T, U> {
         T extractElement(U element) throws JSONException {
             return (T) element;
         }
     }
 
-    private <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce)
+    protected <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce)
             throws AtlasServiceException {
         try {
             JSONArray results = jsonResponse.getJSONArray(key);
@@ -1011,22 +1043,12 @@ public class AtlasClient {
     private class AtlasClientContext {
         private String[] baseUrls;
         private Client client;
-        private final UserGroupInformation ugi;
-        private final String doAsUser;
+        private String doAsUser;
+        private UserGroupInformation ugi;
 
         public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
             this.baseUrls = baseUrls;
             this.client = client;
-            this.ugi = ugi;
-            this.doAsUser = doAsUser;
-        }
-
-        public UserGroupInformation getUgi() {
-            return ugi;
-        }
-
-        public String getDoAsUser() {
-            return doAsUser;
         }
 
         public Client getClient() {
@@ -1036,6 +1058,14 @@ public class AtlasClient {
         public String[] getBaseUrls() {
             return baseUrls;
         }
+
+        public String getDoAsUser() {
+            return doAsUser;
+        }
+
+        public UserGroupInformation getUgi() {
+            return ugi;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/AtlasServiceException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasServiceException.java b/client/src/main/java/org/apache/atlas/AtlasServiceException.java
index 6f68a71..2117a6b 100755
--- a/client/src/main/java/org/apache/atlas/AtlasServiceException.java
+++ b/client/src/main/java/org/apache/atlas/AtlasServiceException.java
@@ -19,6 +19,10 @@
 package org.apache.atlas;
 
 import com.sun.jersey.api.client.ClientResponse;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import javax.ws.rs.WebApplicationException;
 
 public class AtlasServiceException extends Exception {
     private ClientResponse.Status status;
@@ -27,12 +31,19 @@ public class AtlasServiceException extends Exception {
         super("Metadata service API " + api + " failed", e);
     }
 
+    public AtlasServiceException(AtlasClient.API api, WebApplicationException e) throws JSONException {
+        this(api, ClientResponse.Status.fromStatusCode(e.getResponse().getStatus()),
+                ((JSONObject) e.getResponse().getEntity()).getString("stackTrace"));
+    }
+
+    private AtlasServiceException(AtlasClient.API api, ClientResponse.Status status, String response) {
+        super("Metadata service API " + api + " failed with status " + status.getStatusCode() + "(" +
+                status.getReasonPhrase() + ") Response Body (" + response + ")");
+        this.status = status;
+    }
+
     public AtlasServiceException(AtlasClient.API api, ClientResponse response) {
-        super("Metadata service API " + api + " failed with status " +
-                response.getClientResponseStatus().getStatusCode() + "(" +
-                response.getClientResponseStatus().getReasonPhrase() + ") Response Body (" +
-                response.getEntity(String.class) + ")");
-        this.status = response.getClientResponseStatus();
+        this(api, ClientResponse.Status.fromStatusCode(response.getStatus()), response.getEntity(String.class));
     }
 
     public AtlasServiceException(Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java b/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
index d3b474a..1686112 100644
--- a/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
+++ b/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
@@ -20,6 +20,7 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
 import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
 import org.apache.atlas.AtlasException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.alias.CredentialProviderFactory;
@@ -60,7 +61,7 @@ public class SecureClientUtils {
 
 
     public static URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config,
-            org.apache.commons.configuration.Configuration clientConfig, final String doAsUser,
+            org.apache.commons.configuration.Configuration clientConfig, String doAsUser,
             final UserGroupInformation ugi) {
         config.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
         Configuration conf = new Configuration();
@@ -80,17 +81,16 @@ public class SecureClientUtils {
         final DelegationTokenAuthenticatedURL.Token token = new DelegationTokenAuthenticatedURL.Token();
         HttpURLConnectionFactory httpURLConnectionFactory = null;
         try {
-            UserGroupInformation ugiToUse = ugi != null ?
-                ugi : UserGroupInformation.getCurrentUser();
+            UserGroupInformation ugiToUse = ugi != null ? ugi : UserGroupInformation.getCurrentUser();
             final UserGroupInformation actualUgi =
-                (ugiToUse.getAuthenticationMethod() ==
-                 UserGroupInformation.AuthenticationMethod.PROXY)
-                    ? ugiToUse.getRealUser()
-                    : ugiToUse;
-            LOG.info("Real User: {}, is from ticket cache? {}",
-                     actualUgi,
-                     actualUgi.isLoginTicketBased());
+                    (ugiToUse.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY)
+                    ? ugiToUse.getRealUser() : ugiToUse;
+            LOG.info("Real User: {}, is from ticket cache? {}", actualUgi, actualUgi.isLoginTicketBased());
+            if (StringUtils.isEmpty(doAsUser)) {
+                doAsUser = actualUgi.getShortUserName();
+            }
             LOG.info("doAsUser: {}", doAsUser);
+            final String finalDoAsUser = doAsUser;
             httpURLConnectionFactory = new HttpURLConnectionFactory() {
                 @Override
                 public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
@@ -99,9 +99,8 @@ public class SecureClientUtils {
                             @Override
                             public HttpURLConnection run() throws Exception {
                                 try {
-                                    return new DelegationTokenAuthenticatedURL(
-                                        finalAuthenticator, connConfigurator)
-                                        .openConnection(url, token, doAsUser);
+                                    return new DelegationTokenAuthenticatedURL(finalAuthenticator, connConfigurator)
+                                        .openConnection(url, token, finalDoAsUser);
                                 } catch (Exception e) {
                                     throw new IOException(e);
                                 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/test/java/org/apache/atlas/AtlasClientTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
index 8911bf5..0e80573 100644
--- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java
+++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
@@ -30,14 +30,12 @@ import org.testng.annotations.Test;
 
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
-
 import java.net.ConnectException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -326,6 +324,7 @@ public class AtlasClientTest {
                 thenReturn(response);
 
         when(resourceCreator.createResource()).thenReturn(resourceObject);
+        when(configuration.getString("atlas.http.authentication.type", "simple")).thenReturn("simple");
 
         AtlasClient atlasClient = getClientForTest("http://localhost:31000");
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index 1ac4082..17bf68f 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -61,7 +61,7 @@
     </logger>
 
     <root>
-        <priority value="info"/>
+        <priority value="warn"/>
         <appender-ref ref="FILE"/>
     </root>
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
deleted file mode 100644
index 1f2df3e..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import kafka.consumer.ConsumerTimeoutException;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.ha.HAConfiguration;
-import org.apache.atlas.listener.ActiveStateChangeHandler;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.service.Service;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Consumer of notifications from hooks e.g., hive hook etc.
- */
-@Singleton
-public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
-
-    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
-    public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
-    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
-
-    private NotificationInterface notificationInterface;
-    private ExecutorService executors;
-    private String atlasEndpoint;
-    private Configuration applicationProperties;
-    private List<HookConsumer> consumers;
-
-    @Inject
-    public NotificationHookConsumer(NotificationInterface notificationInterface) {
-        this.notificationInterface = notificationInterface;
-    }
-
-    @Override
-    public void start() throws AtlasException {
-        Configuration configuration = ApplicationProperties.get();
-        startInternal(configuration, null);
-    }
-
-    void startInternal(Configuration configuration,
-                       ExecutorService executorService) {
-        this.applicationProperties = configuration;
-        this.atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
-        if (consumers == null) {
-            consumers = new ArrayList<>();
-        }
-        if (executorService != null) {
-            executors = executorService;
-        }
-        if (!HAConfiguration.isHAEnabled(configuration)) {
-            LOG.info("HA is disabled, starting consumers inline.");
-            startConsumers(executorService);
-        }
-    }
-
-    private void startConsumers(ExecutorService executorService) {
-        int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
-        List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
-                notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
-        if (executorService == null) {
-            executorService = Executors.newFixedThreadPool(notificationConsumers.size());
-        }
-        executors = executorService;
-        for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
-            HookConsumer hookConsumer = new HookConsumer(consumer);
-            consumers.add(hookConsumer);
-            executors.submit(hookConsumer);
-        }
-    }
-
-    @Override
-    public void stop() {
-        //Allow for completion of outstanding work
-        notificationInterface.close();
-        try {
-            if (executors != null) {
-                stopConsumerThreads();
-                executors.shutdownNow();
-                if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
-                    LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
-                }
-                executors = null;
-            }
-        } catch (InterruptedException e) {
-            LOG.error("Failure in shutting down consumers");
-        }
-    }
-
-    private void stopConsumerThreads() {
-        if (consumers != null) {
-            for (HookConsumer consumer : consumers) {
-                consumer.stop();
-            }
-            consumers.clear();
-        }
-    }
-
-    /**
-     * Start Kafka consumer threads that read from Kafka topic when server is activated.
-     *
-     * Since the consumers create / update entities to the shared backend store, only the active instance
-     * should perform this activity. Hence, these threads are started only on server activation.
-     */
-    @Override
-    public void instanceIsActive() {
-        LOG.info("Reacting to active state: initializing Kafka consumers");
-        startConsumers(executors);
-    }
-
-    /**
-     * Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
-     *
-     * Since the consumers create / update entities to the shared backend store, only the active instance
-     * should perform this activity. Hence, these threads are stopped only on server deactivation.
-     */
-    @Override
-    public void instanceIsPassive() {
-        LOG.info("Reacting to passive state: shutting down Kafka consumers.");
-        stop();
-    }
-
-    static class Timer {
-        public void sleep(int interval) throws InterruptedException {
-            Thread.sleep(interval);
-        }
-    }
-
-    class HookConsumer implements Runnable {
-        private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
-        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
-
-        public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
-            this.consumer = consumer;
-        }
-
-        private boolean hasNext() {
-            try {
-                return consumer.hasNext();
-            } catch (ConsumerTimeoutException e) {
-                return false;
-            }
-        }
-
-        @Override
-        public void run() {
-            shouldRun.set(true);
-
-            if (!serverAvailable(new NotificationHookConsumer.Timer())) {
-                return;
-            }
-
-            while (shouldRun.get()) {
-                try {
-                    if (hasNext()) {
-                        HookNotification.HookNotificationMessage message = consumer.next();
-                        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(message.getUser());
-                        AtlasClient atlasClient = getAtlasClient(ugi);
-
-                        try {
-                            switch (message.getType()) {
-                            case ENTITY_CREATE:
-                                HookNotification.EntityCreateRequest createRequest =
-                                        (HookNotification.EntityCreateRequest) message;
-                                atlasClient.createEntity(createRequest.getEntities());
-                                break;
-
-                            case ENTITY_PARTIAL_UPDATE:
-                                HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
-                                        (HookNotification.EntityPartialUpdateRequest) message;
-                                atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
-                                        partialUpdateRequest.getAttribute(),
-                                        partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
-                                break;
-
-                            case ENTITY_DELETE:
-                                HookNotification.EntityDeleteRequest deleteRequest =
-                                    (HookNotification.EntityDeleteRequest) message;
-                                atlasClient.deleteEntity(deleteRequest.getTypeName(),
-                                    deleteRequest.getAttribute(),
-                                    deleteRequest.getAttributeValue());
-                                break;
-
-                            case ENTITY_FULL_UPDATE:
-                                HookNotification.EntityUpdateRequest updateRequest =
-                                        (HookNotification.EntityUpdateRequest) message;
-                                atlasClient.updateEntities(updateRequest.getEntities());
-                                break;
-
-                            default:
-                                throw new IllegalStateException("Unhandled exception!");
-                            }
-                        } catch (Exception e) {
-                            //todo handle failures
-                            LOG.warn("Error handling message {}", message, e);
-                        }
-                    }
-                } catch (Throwable t) {
-                    LOG.warn("Failure in NotificationHookConsumer", t);
-                }
-            }
-        }
-
-        protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-            return new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
-        }
-
-        boolean serverAvailable(Timer timer) {
-            try {
-                AtlasClient atlasClient = getAtlasClient(UserGroupInformation.getCurrentUser());
-                while (!atlasClient.isServerReady()) {
-                    try {
-                        LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
-                                SERVER_READY_WAIT_TIME_MS);
-                        timer.sleep(SERVER_READY_WAIT_TIME_MS);
-                    } catch (InterruptedException e) {
-                        LOG.info("Interrupted while waiting for Atlas Server to become ready, "
-                                + "exiting consumer thread.", e);
-                        return false;
-                    }
-                }
-            } catch (Throwable e) {
-                LOG.info(
-                        "Handled AtlasServiceException while waiting for Atlas Server to become ready, "
-                                + "exiting consumer thread.", e);
-                return false;
-            }
-            LOG.info("Atlas Server is ready, can start reading Kafka events.");
-            return true;
-        }
-
-        public void stop() {
-            shouldRun.set(false);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
deleted file mode 100644
index 177de6d..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification;
-
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.atlas.ha.HAConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import static org.mockito.Mockito.*;
-import static org.testng.AssertJUnit.assertFalse;
-import static org.testng.AssertJUnit.assertTrue;
-
-public class NotificationHookConsumerTest {
-
-    @Mock
-    private NotificationInterface notificationInterface;
-
-    @Mock
-    private AtlasClient atlasClient;
-
-    @Mock
-    private Configuration configuration;
-
-    @Mock
-    private ExecutorService executorService;
-
-    @BeforeMethod
-    public void setup() {
-        MockitoAnnotations.initMocks(this);
-    }
-
-    @Test
-    public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
-                    @Override
-                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-                        return atlasClient;
-                    }
-                };
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
-        when(atlasClient.isServerReady()).thenReturn(true);
-
-        assertTrue(hookConsumer.serverAvailable(timer));
-
-        verifyZeroInteractions(timer);
-    }
-
-    @Test
-    public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
-                    @Override
-                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-                        return atlasClient;
-                    }
-                };
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
-        when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
-
-        assertTrue(hookConsumer.serverAvailable(timer));
-
-        verify(timer, times(3)).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
-    }
-
-    @Test
-    public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
-                    @Override
-                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-                        return atlasClient;
-                    }
-                };
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
-        doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
-        when(atlasClient.isServerReady()).thenReturn(false);
-
-        assertFalse(hookConsumer.serverAvailable(timer));
-    }
-
-    @Test
-    public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
-                    @Override
-                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-                        return atlasClient;
-                    }
-                };
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
-        when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
-                new Exception()));
-
-        assertFalse(hookConsumer.serverAvailable(timer));
-    }
-
-    @Test
-    public void testConsumersStartedIfHAIsDisabled() {
-        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
-        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        notificationHookConsumer.startInternal(configuration, executorService);
-        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
-        verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
-    }
-
-    @Test
-    public void testConsumersAreNotStartedIfHAIsEnabled() {
-        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
-        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        notificationHookConsumer.startInternal(configuration, executorService);
-        verifyZeroInteractions(notificationInterface);
-    }
-
-    @Test
-    public void testConsumersAreStartedWhenInstanceBecomesActive() {
-        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
-        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        notificationHookConsumer.startInternal(configuration, executorService);
-        notificationHookConsumer.instanceIsActive();
-        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
-        verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
-    }
-
-    @Test
-    public void testConsumersAreStoppedWhenInstanceBecomesPassive() {
-        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
-        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        notificationHookConsumer.startInternal(configuration, executorService);
-        notificationHookConsumer.instanceIsPassive();
-        verify(notificationInterface).close();
-        verify(executorService).shutdownNow();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 27d44cf..7b872c3 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1371,7 +1371,7 @@
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-war-plugin</artifactId>
-                    <version>2.4</version>
+                    <version>2.6</version>
                 </plugin>
 
                 <plugin>
@@ -1657,15 +1657,17 @@
                         <exclude>**/overlays/**</exclude>
                         <exclude>dev-support/**</exclude>
                         <exclude>**/users-credentials.properties</exclude>
-			<exclude>**/public/css/animate.min.css</exclude>
-			<exclude>**/public/css/fonts/**</exclude>
-			<exclude>**/public/css/font-awesome.min.css</exclude>
-			<exclude>**/public/js/require-handlebars-plugin/**</exclude>
-			<exclude>**/node_modules/**</exclude>
-			<!-- All the npm plugins are copied here, so exclude it -->
-			<exclude>**/public/js/libs/**</exclude>
-  
-
+                        <exclude>**/public/css/animate.min.css</exclude>
+                        <exclude>**/public/css/fonts/**</exclude>
+                        <exclude>**/public/css/font-awesome.min.css</exclude>
+                        <exclude>**/public/js/require-handlebars-plugin/**</exclude>
+                        <exclude>**/node_modules/**</exclude>
+                        <!-- All the npm plugins are copied here, so exclude it -->
+                        <exclude>**/public/js/libs/**</exclude>
+
+                        <!-- atlas data directory creates when tests are run from IDE -->
+                        <exclude>**/atlas.data/**</exclude>
+                        <exclude>**/${sys:atlas.data}/**</exclude>
                     </excludes>
                 </configuration>
                 <executions>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a2de92a..7c17a9d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -18,6 +18,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)
 ATLAS-682 Set HBase root dir to be relative to test target directory for HBaseBasedAuditRepositoryTest (shwethags via yhemanth)
 ATLAS-742 Avoid downloading hbase multiple times (shwethags via yhemanth)
 ATLAS-659 atlas_start fails on Windows (dkantor via shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index d83c08c..9b9fe35 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -106,6 +106,12 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
             // create a composite index for entity state
             createCompositeAndMixedIndex(management, Constants.STATE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
 
+            // create a composite index for entity state
+            createCompositeAndMixedIndex(management, Constants.TIMESTAMP_PROPERTY_KEY, Long.class, false, Cardinality.SINGLE, true);
+
+            // create a composite index for entity state
+            createCompositeAndMixedIndex(management, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class, false, Cardinality.SINGLE, true);
+
             // create a composite and mixed index for type since it can be combined with other keys
             createCompositeAndMixedIndex(management, Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE,
                     true);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 393863c..de48c15 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -301,6 +301,7 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-war-plugin</artifactId>
                 <configuration>
+                    <archiveClasses>true</archiveClasses>
                     <attachClasses>true</attachClasses>
                     <overlays>
                         <!-- <overlay>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java b/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java
new file mode 100644
index 0000000..c6ed85d
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.google.inject.Inject;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.atlas.web.filters.AuditFilter;
+import org.apache.atlas.web.resources.EntityResource;
+import org.apache.atlas.web.service.ServiceState;
+import org.apache.atlas.web.util.DateTimeHelper;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Local atlas client which calls the resource methods directly. Used by NotificationHookConsumer.
+ */
+public class LocalAtlasClient extends AtlasClient {
+    private static final String LOCALHOST = "localhost";
+    private static final String CLASS =  LocalAtlasClient.class.getSimpleName();
+
+    public static final Logger LOG = LoggerFactory.getLogger(LocalAtlasClient.class);
+
+    private final EntityResource entityResource;
+
+    private final ServiceState serviceState;
+
+    @Inject
+    public LocalAtlasClient(ServiceState serviceState, EntityResource entityResource) {
+        super();
+        this.serviceState = serviceState;
+        this.entityResource = entityResource;
+    }
+
+    private String user;
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    private void setRequestContext() {
+        RequestContext requestContext = RequestContext.createContext();
+        requestContext.setUser(user);
+    }
+
+    @Override
+    public boolean isServerReady() throws AtlasServiceException {
+        return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE;
+    }
+
+    @Override
+    protected List<String> createEntity(final JSONArray entities) throws AtlasServiceException {
+        LOG.debug("Creating entities: {}", entities);
+        EntityOperation entityOperation = new EntityOperation(API.CREATE_ENTITY) {
+            @Override
+            Response invoke() {
+                return entityResource.submit(new LocalServletRequest(entities.toString()));
+            }
+        };
+        JSONObject response = entityOperation.run();
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Create entities returned results: {}", results);
+        return results;
+    }
+
+    @Override
+    protected List<String> updateEntities(final JSONArray entities) throws AtlasServiceException {
+        LOG.debug("Updating entities: {}", entities);
+        EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY) {
+            @Override
+            Response invoke() {
+                return entityResource.updateEntities(new LocalServletRequest(entities.toString()));
+            }
+        };
+        JSONObject response = entityOperation.run();
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Update entities returned results: {}", results);
+        return results;
+    }
+
+    private abstract class EntityOperation {
+        private final API api;
+
+        public EntityOperation(API api) {
+            this.api = api;
+        }
+
+        public JSONObject run() throws AtlasServiceException {
+            setRequestContext();
+            AuditFilter.audit(user, CLASS, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
+
+            try {
+                Response response = invoke();
+                return (JSONObject) response.getEntity();
+            } catch(WebApplicationException e) {
+                try {
+                    throw new AtlasServiceException(api, e);
+                } catch (JSONException e1) {
+                    throw new AtlasServiceException(e);
+                }
+            }
+        }
+
+        abstract Response invoke();
+    }
+
+    @Override
+    public String updateEntity(final String entityType, final String uniqueAttributeName,
+                               final String uniqueAttributeValue, Referenceable entity) throws AtlasServiceException {
+        final String entityJson = InstanceSerialization.toJson(entity, true);
+        LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
+                uniqueAttributeName, uniqueAttributeValue, entityJson);
+        EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY_PARTIAL) {
+            @Override
+            Response invoke() {
+                return entityResource.updateByUniqueAttribute(entityType, uniqueAttributeName, uniqueAttributeValue,
+                        new LocalServletRequest(entityJson));
+            }
+        };
+        JSONObject response = entityOperation.run();
+        String result = getString(response, GUID);
+        LOG.debug("Update entity returned result: {}", result);
+        return result;
+    }
+
+    @Override
+    public List<String> deleteEntity(final String entityType, final String uniqueAttributeName,
+                                     final String uniqueAttributeValue) throws AtlasServiceException {
+        LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
+                uniqueAttributeValue);
+        EntityOperation entityOperation = new EntityOperation(API.DELETE_ENTITY) {
+            @Override
+            Response invoke() {
+                return entityResource.deleteEntities(null, entityType, uniqueAttributeName, uniqueAttributeValue);
+            }
+        };
+        JSONObject response = entityOperation.run();
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Delete entities returned results: {}", results);
+        return results;
+    }
+
+    @Override
+    public String getAdminStatus() throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<String> createType(String typeAsJson) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<String> updateType(String typeAsJson) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<String> listTypes() throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public TypesDef getType(String typeName) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public void updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+
+    @Override
+    public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public Referenceable getEntity(String guid) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public Referenceable getEntity(final String entityType, final String attribute, final String value)
+            throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<String> listEntities(final String entityType) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<EntityAuditEvent> getEntityAuditEvents(String entityId, String startKey, short numResults)
+            throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONArray search(final String searchQuery) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONArray searchByDSL(final String query) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONObject searchByFullText(final String query) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java b/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java
new file mode 100644
index 0000000..36a01b2
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.DispatcherType;
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpUpgradeHandler;
+import javax.servlet.http.Part;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+
+public class LocalServletRequest implements HttpServletRequest {
+    private final String payload;
+
+    LocalServletRequest(String payload) {
+        this.payload = payload;
+    }
+
+    public String getPayload() {
+        return payload;
+    }
+
+    @Override
+    public String getAuthType() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Cookie[] getCookies() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public long getDateHeader(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getHeader(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<String> getHeaders(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<String> getHeaderNames() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getIntHeader(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getMethod() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getPathInfo() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getPathTranslated() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getContextPath() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getQueryString() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRemoteUser() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isUserInRole(String role) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Principal getUserPrincipal() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRequestedSessionId() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRequestURI() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public StringBuffer getRequestURL() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getServletPath() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public HttpSession getSession(boolean create) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public HttpSession getSession() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String changeSessionId() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isRequestedSessionIdValid() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromCookie() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromURL() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromUrl() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean authenticate(HttpServletResponse response) throws IOException, ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void login(String username, String password) throws ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void logout() throws ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Collection<Part> getParts() throws IOException, ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Part getPart(String name) throws IOException, ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public <T extends HttpUpgradeHandler> T upgrade(Class<T> handlerClass) throws IOException, ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Object getAttribute(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<String> getAttributeNames() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getCharacterEncoding() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getContentLength() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public long getContentLengthLong() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getContentType() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public ServletInputStream getInputStream() throws IOException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getParameter(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<String> getParameterNames() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String[] getParameterValues(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Map<String, String[]> getParameterMap() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getProtocol() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getScheme() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getServerName() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getServerPort() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public BufferedReader getReader() throws IOException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRemoteAddr() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRemoteHost() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void setAttribute(String name, Object o) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void removeAttribute(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Locale getLocale() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<Locale> getLocales() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isSecure() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public RequestDispatcher getRequestDispatcher(String path) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRealPath(String path) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getRemotePort() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getLocalName() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getLocalAddr() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getLocalPort() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public ServletContext getServletContext() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public AsyncContext startAsync() throws IllegalStateException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse)
+            throws IllegalStateException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isAsyncStarted() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isAsyncSupported() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public AsyncContext getAsyncContext() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public DispatcherType getDispatcherType() {
+        throw new IllegalStateException("Not supported");
+    }
+}