You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/05/06 07:38:59 UTC

[2/2] incubator-atlas git commit: ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)

ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/1e3029bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/1e3029bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/1e3029bc

Branch: refs/heads/master
Commit: 1e3029bc7283e233dc816de7d83b28eddd4f4b36
Parents: 334429a
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Fri May 6 12:46:49 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Fri May 6 13:08:48 2016 +0530

----------------------------------------------------------------------
 .gitignore                                      |   6 +-
 addons/falcon-bridge/pom.xml                    |   5 -
 addons/hive-bridge/pom.xml                      |   5 -
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |   8 +-
 .../org/apache/atlas/hive/hook/HiveHookIT.java  |  15 +-
 addons/sqoop-bridge/pom.xml                     |   5 -
 addons/storm-bridge/pom.xml                     |   5 -
 .../java/org/apache/atlas/AtlasAdminClient.java |   2 +-
 .../main/java/org/apache/atlas/AtlasClient.java | 150 ++++---
 .../org/apache/atlas/AtlasServiceException.java |  21 +-
 .../atlas/security/SecureClientUtils.java       |  25 +-
 .../java/org/apache/atlas/AtlasClientTest.java  |   3 +-
 distro/src/conf/atlas-log4j.xml                 |   2 +-
 .../notification/NotificationHookConsumer.java  | 265 ------------
 .../NotificationHookConsumerTest.java           | 183 ---------
 pom.xml                                         |  22 +-
 release-log.txt                                 |   1 +
 .../graph/GraphBackedSearchIndexer.java         |   6 +
 webapp/pom.xml                                  |   1 +
 .../java/org/apache/atlas/LocalAtlasClient.java | 260 ++++++++++++
 .../org/apache/atlas/LocalServletRequest.java   | 400 +++++++++++++++++++
 .../org/apache/atlas/examples/QuickStart.java   |   9 +-
 .../notification/NotificationHookConsumer.java  | 259 ++++++++++++
 .../web/filters/AtlasAuthenticationFilter.java  |  53 ++-
 .../apache/atlas/web/filters/AuditFilter.java   |   2 +-
 .../atlas/web/listeners/GuiceServletConfig.java |   3 +-
 .../atlas/web/resources/AdminResource.java      |   3 +-
 .../atlas/web/resources/EntityResource.java     |   4 +-
 .../org/apache/atlas/web/util/Servlets.java     |  32 ++
 .../org/apache/atlas/LocalAtlasClientTest.java  | 148 +++++++
 .../NotificationHookConsumerIT.java             |  34 +-
 .../NotificationHookConsumerTest.java           | 169 ++++++++
 .../AtlasAuthenticationKerberosFilterIT.java    | 190 ---------
 .../AtlasAuthenticationKerberosFilterTest.java  | 187 +++++++++
 .../AtlasAuthenticationSimpleFilterIT.java      |  98 -----
 .../AtlasAuthenticationSimpleFilterTest.java    |  89 +++++
 .../atlas/web/resources/BaseResourceIT.java     |  15 +-
 .../web/resources/EntityJerseyResourceIT.java   |  27 +-
 .../web/security/BaseSSLAndKerberosTest.java    |   3 +-
 .../atlas/web/security/BaseSecurityTest.java    |   8 +-
 .../org/apache/atlas/web/security/SSLTest.java  |   7 +-
 41 files changed, 1797 insertions(+), 933 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index b6fb8d8..b5a1c74 100755
--- a/.gitignore
+++ b/.gitignore
@@ -46,5 +46,9 @@ test-output
 .DS_Store
 *.swp
 
+#atlas data directory creates when tests are run from IDE
+**/atlas.data/**
+**/${sys:atlas.data}/**
+
 #hbase package downloaded
-distro/hbase/*.tar.gz
\ No newline at end of file
+distro/hbase/*.tar.gz

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/falcon-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index 9b07c9f..14c6090 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -152,11 +152,6 @@
                                 </artifactItem>
                                 <artifactItem>
                                     <groupId>${project.groupId}</groupId>
-                                    <artifactId>atlas-server-api</artifactId>
-                                    <version>${project.version}</version>
-                                </artifactItem>
-                                <artifactItem>
-                                    <groupId>${project.groupId}</groupId>
                                     <artifactId>hdfs-model</artifactId>
                                     <version>${project.version}</version>
                                 </artifactItem>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index e125f18..eeb2aa4 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -239,11 +239,6 @@
                                     <version>${project.version}</version>
                                 </artifactItem>
                                 <artifactItem>
-                                    <groupId>${project.groupId}</groupId>
-                                    <artifactId>atlas-server-api</artifactId>
-                                    <version>${project.version}</version>
-                                </artifactItem>
-                                <artifactItem>
                                     <groupId>org.scala-lang</groupId>
                                     <artifactId>scala-compiler</artifactId>
                                     <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 104c0c5..d4212a1 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -181,10 +180,10 @@ public class HiveMetaStoreBridge {
 
         String entityJSON = InstanceSerialization.toJson(referenceable, true);
         LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON);
-        JSONArray guids = getAtlasClient().createEntity(entityJSON);
+        List<String> guids = getAtlasClient().createEntity(entityJSON);
         LOG.debug("created instance for type " + typeName + ", guid: " + guids);
 
-        return new Referenceable(guids.getString(0), referenceable.getTypeName(), null);
+        return new Referenceable(guids.get(0), referenceable.getTypeName(), null);
     }
 
     /**
@@ -536,8 +535,7 @@ public class HiveMetaStoreBridge {
     public static void main(String[] argv) throws Exception {
         Configuration atlasConf = ApplicationProperties.get();
         String atlasEndpoint = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
-        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-        AtlasClient atlasClient = new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
+        AtlasClient atlasClient = new AtlasClient(atlasEndpoint);
 
         HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient);
         hiveMetaStoreBridge.registerHiveDataModel();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index b0d4c5c..317d636 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -20,7 +20,6 @@ package org.apache.atlas.hive.hook;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
@@ -31,12 +30,7 @@ import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.atlas.typesystem.json.TypesSerialization$;
 import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.typesystem.types.TypeSystem;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.configuration.Configuration;
@@ -51,7 +45,6 @@ import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
@@ -59,9 +52,6 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import java.io.File;
 import java.text.ParseException;
 import java.util.Date;
@@ -737,8 +727,6 @@ public class HiveHookIT {
 
         columns = getColumns(DEFAULT_DB, tableName);
         Assert.assertEquals(columns.size(), 2);
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
-                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
 
         String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
                 HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
@@ -749,6 +737,9 @@ public class HiveHookIT {
             }
         });
 
+        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
+
         //Change name and add comment
         oldColName = "name2";
         newColName = "name3";

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/sqoop-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index 4b5dbb1..343bb4e 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -234,11 +234,6 @@
                                     <version>${project.version}</version>
                                 </artifactItem>
                                 <artifactItem>
-                                    <groupId>${project.groupId}</groupId>
-                                    <artifactId>atlas-server-api</artifactId>
-                                    <version>${project.version}</version>
-                                </artifactItem>
-                                <artifactItem>
                                     <groupId>org.scala-lang</groupId>
                                     <artifactId>scala-compiler</artifactId>
                                     <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/storm-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index d8d98f5..45ec846 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -190,11 +190,6 @@
                                 </artifactItem>
                                 <artifactItem>
                                     <groupId>${project.groupId}</groupId>
-                                    <artifactId>atlas-server-api</artifactId>
-                                    <version>${project.version}</version>
-                                </artifactItem>
-                                <artifactItem>
-                                    <groupId>${project.groupId}</groupId>
                                     <artifactId>hdfs-model</artifactId>
                                     <version>${project.version}</version>
                                 </artifactItem>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java b/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
index 473f72a..d2ae7f0 100644
--- a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
@@ -60,7 +60,7 @@ public class AtlasAdminClient {
         Configuration configuration = ApplicationProperties.get();
         String atlasServerUri = configuration.getString(
                 AtlasConstants.ATLAS_REST_ADDRESS_KEY, AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS);
-        AtlasClient atlasClient = new AtlasClient(atlasServerUri, null, null);
+        AtlasClient atlasClient = new AtlasClient(atlasServerUri);
         return handleCommand(commandLine, atlasServerUri, atlasClient);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index be34802..234af5b 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -32,7 +32,6 @@ import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.TypesDef;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.atlas.typesystem.json.TypesSerialization;
-import org.apache.atlas.typesystem.json.TypesSerialization$;
 import org.apache.atlas.typesystem.types.AttributeDefinition;
 import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
 import org.apache.atlas.typesystem.types.TraitType;
@@ -78,6 +77,7 @@ public class AtlasClient {
     public static final String COUNT = "count";
     public static final String ROWS = "rows";
     public static final String DATATYPE = "dataType";
+    public static final String STATUS = "Status";
 
     public static final String EVENTS = "events";
     public static final String START_KEY = "startKey";
@@ -115,6 +115,9 @@ public class AtlasClient {
     // Setting the default value based on testing failovers while client code like quickstart is running.
     public static final int DEFAULT_NUM_RETRIES = 4;
     public static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+
+    public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
+
     // Setting the default value based on testing failovers while client code like quickstart is running.
     // With number of retries, this gives a total time of about 20s for the server to start.
     public static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
@@ -124,28 +127,20 @@ public class AtlasClient {
     private Configuration configuration;
 
     /**
-     * Create a new AtlasClient.
-     *
-     * @param baseUrl The URL of the Atlas server to connect to.
-     */
-    public AtlasClient(String baseUrl) {
-        this(baseUrl, null, null);
-    }
-
-    /**
-     * Create a new Atlas Client.
-     * @param baseUrl The URL of the Atlas server to connect to.
-     * @param ugi The {@link UserGroupInformation} of logged in user.
-     * @param doAsUser The user on whose behalf queries will be executed.
+     * Create a new Atlas client.
+     * @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
+     *                 High Availability mode. The client will automatically determine the
+     *                 active instance on startup and also when there is a scenario of
+     *                 failover.
      */
-    public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) {
-        initializeState(new String[] {baseUrl}, ugi, doAsUser);
+    public AtlasClient(String... baseUrls) throws AtlasException {
+        this(getCurrentUGI(), baseUrls);
     }
 
     /**
      * Create a new Atlas client.
-     * @param ugi The {@link UserGroupInformation} of logged in user, can be null in unsecure mode.
-     * @param doAsUser The user on whose behalf queries will be executed, can be null in unsecure mode.
+     * @param ugi UserGroupInformation
+     * @param doAsUser
      * @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
      *                 High Availability mode. The client will automatically determine the
      *                 active instance on startup and also when there is a scenario of
@@ -155,6 +150,23 @@ public class AtlasClient {
         initializeState(baseUrls, ugi, doAsUser);
     }
 
+    private static UserGroupInformation getCurrentUGI() throws AtlasException {
+        try {
+            return UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+            throw new AtlasException(e);
+        }
+    }
+
+    private AtlasClient(UserGroupInformation ugi, String[] baseUrls) {
+        this(ugi, ugi.getShortUserName(), baseUrls);
+    }
+
+    //Used by LocalAtlasClient
+    protected AtlasClient() {
+        //Do nothing
+    }
+
     private void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
         configuration = getClientProperties();
         Client client = getClient(configuration, ugi, doAsUser);
@@ -340,7 +352,7 @@ public class AtlasClient {
         WebResource resource = getResource(service, API.STATUS);
         JSONObject response = callAPIWithResource(API.STATUS, resource, null);
         try {
-            result = response.getString("Status");
+            result = response.getString(STATUS);
         } catch (JSONException e) {
             LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e);
         }
@@ -418,12 +430,14 @@ public class AtlasClient {
     public List<String> createType(String typeAsJson) throws AtlasServiceException {
         LOG.debug("Creating type definition: {}", typeAsJson);
         JSONObject response = callAPI(API.CREATE_TYPE, typeAsJson);
-        return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
+        List<String> results = extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
             @Override
             String extractElement(JSONObject element) throws JSONException {
                 return element.getString(AtlasClient.NAME);
             }
         });
+        LOG.debug("Create type definition returned results: {}", results);
+        return results;
     }
 
     /**
@@ -470,14 +484,16 @@ public class AtlasClient {
      * @throws AtlasServiceException
      */
     public List<String> updateType(String typeAsJson) throws AtlasServiceException {
-        LOG.debug("Updating tyep definition: {}", typeAsJson);
+        LOG.debug("Updating type definition: {}", typeAsJson);
         JSONObject response = callAPI(API.UPDATE_TYPE, typeAsJson);
-        return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
+        List<String> results = extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
             @Override
             String extractElement(JSONObject element) throws JSONException {
                 return element.getString(AtlasClient.NAME);
             }
         });
+        LOG.debug("Update type definition returned results: {}", results);
+        return results;
     }
 
     /**
@@ -495,10 +511,11 @@ public class AtlasClient {
         return extractResults(jsonObject, AtlasClient.RESULTS, new ExtractOperation<String, String>());
     }
 
-    public String getType(String typeName) throws AtlasServiceException {
+    public TypesDef getType(String typeName) throws AtlasServiceException {
         try {
             JSONObject response = callAPI(API.GET_TYPE, null, typeName);;
-            return response.getString(DEFINITION);
+            String typeJson = response.getString(DEFINITION);
+            return TypesSerialization.fromJson(typeJson);
         } catch (AtlasServiceException e) {
             if (Response.Status.NOT_FOUND.equals(e.getStatus())) {
                 return null;
@@ -515,14 +532,12 @@ public class AtlasClient {
      * @return json array of guids
      * @throws AtlasServiceException
      */
-    public JSONArray createEntity(JSONArray entities) throws AtlasServiceException {
+    protected List<String> createEntity(JSONArray entities) throws AtlasServiceException {
         LOG.debug("Creating entities: {}", entities);
         JSONObject response = callAPI(API.CREATE_ENTITY, entities.toString());
-        try {
-            return response.getJSONArray(GUID);
-        } catch (JSONException e) {
-            throw new AtlasServiceException(API.GET_ENTITY, e);
-        }
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Create entities returned results: {}", results);
+        return results;
     }
 
     /**
@@ -531,15 +546,15 @@ public class AtlasClient {
      * @return json array of guids
      * @throws AtlasServiceException
      */
-    public JSONArray createEntity(String... entitiesAsJson) throws AtlasServiceException {
+    public List<String> createEntity(String... entitiesAsJson) throws AtlasServiceException {
         return createEntity(new JSONArray(Arrays.asList(entitiesAsJson)));
     }
 
-    public JSONArray createEntity(Referenceable... entities) throws AtlasServiceException {
+    public List<String> createEntity(Referenceable... entities) throws AtlasServiceException {
         return createEntity(Arrays.asList(entities));
     }
 
-    public JSONArray createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
+    public List<String> createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
         JSONArray entityArray = getEntitiesArray(entities);
         return createEntity(entityArray);
     }
@@ -559,19 +574,21 @@ public class AtlasClient {
      * @return json array of guids which were updated/created
      * @throws AtlasServiceException
      */
-    public JSONArray updateEntities(Referenceable... entities) throws AtlasServiceException {
+    public List<String> updateEntities(Referenceable... entities) throws AtlasServiceException {
         return updateEntities(Arrays.asList(entities));
     }
 
-    public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
+    protected List<String> updateEntities(JSONArray entities) throws AtlasServiceException {
+        LOG.debug("Updating entities: {}", entities);
+        JSONObject response = callAPI(API.UPDATE_ENTITY, entities.toString());
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Update entities returned results: {}", results);
+        return results;
+    }
+
+    public List<String> updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
         JSONArray entitiesArray = getEntitiesArray(entities);
-        LOG.debug("Updating entities: {}", entitiesArray);
-        JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString());
-        try {
-            return response.getJSONArray(GUID);
-        } catch (JSONException e) {
-            throw new AtlasServiceException(API.UPDATE_ENTITY, e);
-        }
+        return updateEntities(entitiesArray);
     }
 
     /**
@@ -651,6 +668,8 @@ public class AtlasClient {
                                Referenceable entity) throws AtlasServiceException {
         final API api = API.UPDATE_ENTITY_PARTIAL;
         String entityJson = InstanceSerialization.toJson(entity, true);
+        LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
+                uniqueAttributeName, uniqueAttributeValue, entityJson);
         JSONObject response = callAPIWithRetries(api, entityJson, new ResourceCreator() {
             @Override
             public WebResource createResource() {
@@ -661,10 +680,16 @@ public class AtlasClient {
                 return resource;
             }
         });
+        String result = getString(response, GUID);
+        LOG.debug("Update entity returned result: {}", result);
+        return result;
+    }
+
+    protected String getString(JSONObject jsonObject, String parameter) throws AtlasServiceException {
         try {
-            return response.getString(GUID);
+            return jsonObject.getString(parameter);
         } catch (JSONException e) {
-            throw new AtlasServiceException(api, e);
+            throw new AtlasServiceException(e);
         }
     }
 
@@ -676,6 +701,7 @@ public class AtlasClient {
      * @throws AtlasServiceException
      */
     public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
+        LOG.debug("Deleting entities: {}", guids);
         JSONObject jsonResponse = callAPIWithRetries(API.DELETE_ENTITIES, null, new ResourceCreator() {
             @Override
             public WebResource createResource() {
@@ -687,7 +713,9 @@ public class AtlasClient {
                 return resource;
             }
         });
-        return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+        List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Delete entities returned results: {}", results);
+        return results;
     }
 
     /**
@@ -699,13 +727,17 @@ public class AtlasClient {
      */
     public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue)
             throws AtlasServiceException {
+        LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
+                uniqueAttributeValue);
         API api = API.DELETE_ENTITY;
         WebResource resource = getResource(api);
         resource = resource.queryParam(TYPE, entityType);
         resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
         resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
         JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource, null);
-        return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+        List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Delete entities returned results: {}", results);
+        return results;
     }
 
     /**
@@ -789,13 +821,13 @@ public class AtlasClient {
         return extractResults(jsonResponse, AtlasClient.RESULTS, new ExtractOperation<String, String>());
     }
 
-    private class ExtractOperation<T, U> {
+    protected class ExtractOperation<T, U> {
         T extractElement(U element) throws JSONException {
             return (T) element;
         }
     }
 
-    private <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce)
+    protected <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce)
             throws AtlasServiceException {
         try {
             JSONArray results = jsonResponse.getJSONArray(key);
@@ -1011,22 +1043,12 @@ public class AtlasClient {
     private class AtlasClientContext {
         private String[] baseUrls;
         private Client client;
-        private final UserGroupInformation ugi;
-        private final String doAsUser;
+        private String doAsUser;
+        private UserGroupInformation ugi;
 
         public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
             this.baseUrls = baseUrls;
             this.client = client;
-            this.ugi = ugi;
-            this.doAsUser = doAsUser;
-        }
-
-        public UserGroupInformation getUgi() {
-            return ugi;
-        }
-
-        public String getDoAsUser() {
-            return doAsUser;
         }
 
         public Client getClient() {
@@ -1036,6 +1058,14 @@ public class AtlasClient {
         public String[] getBaseUrls() {
             return baseUrls;
         }
+
+        public String getDoAsUser() {
+            return doAsUser;
+        }
+
+        public UserGroupInformation getUgi() {
+            return ugi;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/AtlasServiceException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasServiceException.java b/client/src/main/java/org/apache/atlas/AtlasServiceException.java
index 6f68a71..2117a6b 100755
--- a/client/src/main/java/org/apache/atlas/AtlasServiceException.java
+++ b/client/src/main/java/org/apache/atlas/AtlasServiceException.java
@@ -19,6 +19,10 @@
 package org.apache.atlas;
 
 import com.sun.jersey.api.client.ClientResponse;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import javax.ws.rs.WebApplicationException;
 
 public class AtlasServiceException extends Exception {
     private ClientResponse.Status status;
@@ -27,12 +31,19 @@ public class AtlasServiceException extends Exception {
         super("Metadata service API " + api + " failed", e);
     }
 
+    public AtlasServiceException(AtlasClient.API api, WebApplicationException e) throws JSONException {
+        this(api, ClientResponse.Status.fromStatusCode(e.getResponse().getStatus()),
+                ((JSONObject) e.getResponse().getEntity()).getString("stackTrace"));
+    }
+
+    private AtlasServiceException(AtlasClient.API api, ClientResponse.Status status, String response) {
+        super("Metadata service API " + api + " failed with status " + status.getStatusCode() + "(" +
+                status.getReasonPhrase() + ") Response Body (" + response + ")");
+        this.status = status;
+    }
+
     public AtlasServiceException(AtlasClient.API api, ClientResponse response) {
-        super("Metadata service API " + api + " failed with status " +
-                response.getClientResponseStatus().getStatusCode() + "(" +
-                response.getClientResponseStatus().getReasonPhrase() + ") Response Body (" +
-                response.getEntity(String.class) + ")");
-        this.status = response.getClientResponseStatus();
+        this(api, ClientResponse.Status.fromStatusCode(response.getStatus()), response.getEntity(String.class));
     }
 
     public AtlasServiceException(Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java b/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
index d3b474a..1686112 100644
--- a/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
+++ b/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
@@ -20,6 +20,7 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
 import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
 import org.apache.atlas.AtlasException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.alias.CredentialProviderFactory;
@@ -60,7 +61,7 @@ public class SecureClientUtils {
 
 
     public static URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config,
-            org.apache.commons.configuration.Configuration clientConfig, final String doAsUser,
+            org.apache.commons.configuration.Configuration clientConfig, String doAsUser,
             final UserGroupInformation ugi) {
         config.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
         Configuration conf = new Configuration();
@@ -80,17 +81,16 @@ public class SecureClientUtils {
         final DelegationTokenAuthenticatedURL.Token token = new DelegationTokenAuthenticatedURL.Token();
         HttpURLConnectionFactory httpURLConnectionFactory = null;
         try {
-            UserGroupInformation ugiToUse = ugi != null ?
-                ugi : UserGroupInformation.getCurrentUser();
+            UserGroupInformation ugiToUse = ugi != null ? ugi : UserGroupInformation.getCurrentUser();
             final UserGroupInformation actualUgi =
-                (ugiToUse.getAuthenticationMethod() ==
-                 UserGroupInformation.AuthenticationMethod.PROXY)
-                    ? ugiToUse.getRealUser()
-                    : ugiToUse;
-            LOG.info("Real User: {}, is from ticket cache? {}",
-                     actualUgi,
-                     actualUgi.isLoginTicketBased());
+                    (ugiToUse.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY)
+                    ? ugiToUse.getRealUser() : ugiToUse;
+            LOG.info("Real User: {}, is from ticket cache? {}", actualUgi, actualUgi.isLoginTicketBased());
+            if (StringUtils.isEmpty(doAsUser)) {
+                doAsUser = actualUgi.getShortUserName();
+            }
             LOG.info("doAsUser: {}", doAsUser);
+            final String finalDoAsUser = doAsUser;
             httpURLConnectionFactory = new HttpURLConnectionFactory() {
                 @Override
                 public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
@@ -99,9 +99,8 @@ public class SecureClientUtils {
                             @Override
                             public HttpURLConnection run() throws Exception {
                                 try {
-                                    return new DelegationTokenAuthenticatedURL(
-                                        finalAuthenticator, connConfigurator)
-                                        .openConnection(url, token, doAsUser);
+                                    return new DelegationTokenAuthenticatedURL(finalAuthenticator, connConfigurator)
+                                        .openConnection(url, token, finalDoAsUser);
                                 } catch (Exception e) {
                                     throw new IOException(e);
                                 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/test/java/org/apache/atlas/AtlasClientTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
index 8911bf5..0e80573 100644
--- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java
+++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
@@ -30,14 +30,12 @@ import org.testng.annotations.Test;
 
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
-
 import java.net.ConnectException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -326,6 +324,7 @@ public class AtlasClientTest {
                 thenReturn(response);
 
         when(resourceCreator.createResource()).thenReturn(resourceObject);
+        when(configuration.getString("atlas.http.authentication.type", "simple")).thenReturn("simple");
 
         AtlasClient atlasClient = getClientForTest("http://localhost:31000");
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index 1ac4082..17bf68f 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -61,7 +61,7 @@
     </logger>
 
     <root>
-        <priority value="info"/>
+        <priority value="warn"/>
         <appender-ref ref="FILE"/>
     </root>
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
deleted file mode 100644
index 1f2df3e..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import kafka.consumer.ConsumerTimeoutException;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.ha.HAConfiguration;
-import org.apache.atlas.listener.ActiveStateChangeHandler;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.service.Service;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Consumer of notifications from hooks e.g., hive hook etc.
- */
-@Singleton
-public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
-
-    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
-    public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
-    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
-
-    private NotificationInterface notificationInterface;
-    private ExecutorService executors;
-    private String atlasEndpoint;
-    private Configuration applicationProperties;
-    private List<HookConsumer> consumers;
-
-    @Inject
-    public NotificationHookConsumer(NotificationInterface notificationInterface) {
-        this.notificationInterface = notificationInterface;
-    }
-
-    @Override
-    public void start() throws AtlasException {
-        Configuration configuration = ApplicationProperties.get();
-        startInternal(configuration, null);
-    }
-
-    void startInternal(Configuration configuration,
-                       ExecutorService executorService) {
-        this.applicationProperties = configuration;
-        this.atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
-        if (consumers == null) {
-            consumers = new ArrayList<>();
-        }
-        if (executorService != null) {
-            executors = executorService;
-        }
-        if (!HAConfiguration.isHAEnabled(configuration)) {
-            LOG.info("HA is disabled, starting consumers inline.");
-            startConsumers(executorService);
-        }
-    }
-
-    private void startConsumers(ExecutorService executorService) {
-        int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
-        List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
-                notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
-        if (executorService == null) {
-            executorService = Executors.newFixedThreadPool(notificationConsumers.size());
-        }
-        executors = executorService;
-        for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
-            HookConsumer hookConsumer = new HookConsumer(consumer);
-            consumers.add(hookConsumer);
-            executors.submit(hookConsumer);
-        }
-    }
-
-    @Override
-    public void stop() {
-        //Allow for completion of outstanding work
-        notificationInterface.close();
-        try {
-            if (executors != null) {
-                stopConsumerThreads();
-                executors.shutdownNow();
-                if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
-                    LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
-                }
-                executors = null;
-            }
-        } catch (InterruptedException e) {
-            LOG.error("Failure in shutting down consumers");
-        }
-    }
-
-    private void stopConsumerThreads() {
-        if (consumers != null) {
-            for (HookConsumer consumer : consumers) {
-                consumer.stop();
-            }
-            consumers.clear();
-        }
-    }
-
-    /**
-     * Start Kafka consumer threads that read from Kafka topic when server is activated.
-     *
-     * Since the consumers create / update entities to the shared backend store, only the active instance
-     * should perform this activity. Hence, these threads are started only on server activation.
-     */
-    @Override
-    public void instanceIsActive() {
-        LOG.info("Reacting to active state: initializing Kafka consumers");
-        startConsumers(executors);
-    }
-
-    /**
-     * Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
-     *
-     * Since the consumers create / update entities to the shared backend store, only the active instance
-     * should perform this activity. Hence, these threads are stopped only on server deactivation.
-     */
-    @Override
-    public void instanceIsPassive() {
-        LOG.info("Reacting to passive state: shutting down Kafka consumers.");
-        stop();
-    }
-
-    static class Timer {
-        public void sleep(int interval) throws InterruptedException {
-            Thread.sleep(interval);
-        }
-    }
-
-    class HookConsumer implements Runnable {
-        private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
-        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
-
-        public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
-            this.consumer = consumer;
-        }
-
-        private boolean hasNext() {
-            try {
-                return consumer.hasNext();
-            } catch (ConsumerTimeoutException e) {
-                return false;
-            }
-        }
-
-        @Override
-        public void run() {
-            shouldRun.set(true);
-
-            if (!serverAvailable(new NotificationHookConsumer.Timer())) {
-                return;
-            }
-
-            while (shouldRun.get()) {
-                try {
-                    if (hasNext()) {
-                        HookNotification.HookNotificationMessage message = consumer.next();
-                        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(message.getUser());
-                        AtlasClient atlasClient = getAtlasClient(ugi);
-
-                        try {
-                            switch (message.getType()) {
-                            case ENTITY_CREATE:
-                                HookNotification.EntityCreateRequest createRequest =
-                                        (HookNotification.EntityCreateRequest) message;
-                                atlasClient.createEntity(createRequest.getEntities());
-                                break;
-
-                            case ENTITY_PARTIAL_UPDATE:
-                                HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
-                                        (HookNotification.EntityPartialUpdateRequest) message;
-                                atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
-                                        partialUpdateRequest.getAttribute(),
-                                        partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
-                                break;
-
-                            case ENTITY_DELETE:
-                                HookNotification.EntityDeleteRequest deleteRequest =
-                                    (HookNotification.EntityDeleteRequest) message;
-                                atlasClient.deleteEntity(deleteRequest.getTypeName(),
-                                    deleteRequest.getAttribute(),
-                                    deleteRequest.getAttributeValue());
-                                break;
-
-                            case ENTITY_FULL_UPDATE:
-                                HookNotification.EntityUpdateRequest updateRequest =
-                                        (HookNotification.EntityUpdateRequest) message;
-                                atlasClient.updateEntities(updateRequest.getEntities());
-                                break;
-
-                            default:
-                                throw new IllegalStateException("Unhandled exception!");
-                            }
-                        } catch (Exception e) {
-                            //todo handle failures
-                            LOG.warn("Error handling message {}", message, e);
-                        }
-                    }
-                } catch (Throwable t) {
-                    LOG.warn("Failure in NotificationHookConsumer", t);
-                }
-            }
-        }
-
-        protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-            return new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
-        }
-
-        boolean serverAvailable(Timer timer) {
-            try {
-                AtlasClient atlasClient = getAtlasClient(UserGroupInformation.getCurrentUser());
-                while (!atlasClient.isServerReady()) {
-                    try {
-                        LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
-                                SERVER_READY_WAIT_TIME_MS);
-                        timer.sleep(SERVER_READY_WAIT_TIME_MS);
-                    } catch (InterruptedException e) {
-                        LOG.info("Interrupted while waiting for Atlas Server to become ready, "
-                                + "exiting consumer thread.", e);
-                        return false;
-                    }
-                }
-            } catch (Throwable e) {
-                LOG.info(
-                        "Handled AtlasServiceException while waiting for Atlas Server to become ready, "
-                                + "exiting consumer thread.", e);
-                return false;
-            }
-            LOG.info("Atlas Server is ready, can start reading Kafka events.");
-            return true;
-        }
-
-        public void stop() {
-            shouldRun.set(false);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
deleted file mode 100644
index 177de6d..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification;
-
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.atlas.ha.HAConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import static org.mockito.Mockito.*;
-import static org.testng.AssertJUnit.assertFalse;
-import static org.testng.AssertJUnit.assertTrue;
-
-public class NotificationHookConsumerTest {
-
-    @Mock
-    private NotificationInterface notificationInterface;
-
-    @Mock
-    private AtlasClient atlasClient;
-
-    @Mock
-    private Configuration configuration;
-
-    @Mock
-    private ExecutorService executorService;
-
-    @BeforeMethod
-    public void setup() {
-        MockitoAnnotations.initMocks(this);
-    }
-
-    @Test
-    public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
-                    @Override
-                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-                        return atlasClient;
-                    }
-                };
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
-        when(atlasClient.isServerReady()).thenReturn(true);
-
-        assertTrue(hookConsumer.serverAvailable(timer));
-
-        verifyZeroInteractions(timer);
-    }
-
-    @Test
-    public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
-                    @Override
-                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-                        return atlasClient;
-                    }
-                };
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
-        when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
-
-        assertTrue(hookConsumer.serverAvailable(timer));
-
-        verify(timer, times(3)).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
-    }
-
-    @Test
-    public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
-                    @Override
-                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-                        return atlasClient;
-                    }
-                };
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
-        doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
-        when(atlasClient.isServerReady()).thenReturn(false);
-
-        assertFalse(hookConsumer.serverAvailable(timer));
-    }
-
-    @Test
-    public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
-                    @Override
-                    protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
-                        return atlasClient;
-                    }
-                };
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
-        when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
-                new Exception()));
-
-        assertFalse(hookConsumer.serverAvailable(timer));
-    }
-
-    @Test
-    public void testConsumersStartedIfHAIsDisabled() {
-        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
-        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        notificationHookConsumer.startInternal(configuration, executorService);
-        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
-        verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
-    }
-
-    @Test
-    public void testConsumersAreNotStartedIfHAIsEnabled() {
-        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
-        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        notificationHookConsumer.startInternal(configuration, executorService);
-        verifyZeroInteractions(notificationInterface);
-    }
-
-    @Test
-    public void testConsumersAreStartedWhenInstanceBecomesActive() {
-        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
-        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        notificationHookConsumer.startInternal(configuration, executorService);
-        notificationHookConsumer.instanceIsActive();
-        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
-        verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
-    }
-
-    @Test
-    public void testConsumersAreStoppedWhenInstanceBecomesPassive() {
-        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
-        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
-        notificationHookConsumer.startInternal(configuration, executorService);
-        notificationHookConsumer.instanceIsPassive();
-        verify(notificationInterface).close();
-        verify(executorService).shutdownNow();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 27d44cf..7b872c3 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1371,7 +1371,7 @@
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-war-plugin</artifactId>
-                    <version>2.4</version>
+                    <version>2.6</version>
                 </plugin>
 
                 <plugin>
@@ -1657,15 +1657,17 @@
                         <exclude>**/overlays/**</exclude>
                         <exclude>dev-support/**</exclude>
                         <exclude>**/users-credentials.properties</exclude>
-			<exclude>**/public/css/animate.min.css</exclude>
-			<exclude>**/public/css/fonts/**</exclude>
-			<exclude>**/public/css/font-awesome.min.css</exclude>
-			<exclude>**/public/js/require-handlebars-plugin/**</exclude>
-			<exclude>**/node_modules/**</exclude>
-			<!-- All the npm plugins are copied here, so exclude it -->
-			<exclude>**/public/js/libs/**</exclude>
-  
-
+                        <exclude>**/public/css/animate.min.css</exclude>
+                        <exclude>**/public/css/fonts/**</exclude>
+                        <exclude>**/public/css/font-awesome.min.css</exclude>
+                        <exclude>**/public/js/require-handlebars-plugin/**</exclude>
+                        <exclude>**/node_modules/**</exclude>
+                        <!-- All the npm plugins are copied here, so exclude it -->
+                        <exclude>**/public/js/libs/**</exclude>
+
+                        <!-- atlas data directory creates when tests are run from IDE -->
+                        <exclude>**/atlas.data/**</exclude>
+                        <exclude>**/${sys:atlas.data}/**</exclude>
                     </excludes>
                 </configuration>
                 <executions>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a2de92a..7c17a9d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -18,6 +18,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)
 ATLAS-682 Set HBase root dir to be relative to test target directory for HBaseBasedAuditRepositoryTest (shwethags via yhemanth)
 ATLAS-742 Avoid downloading hbase multiple times (shwethags via yhemanth)
 ATLAS-659 atlas_start fails on Windows (dkantor via shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index d83c08c..9b9fe35 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -106,6 +106,12 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
             // create a composite index for entity state
             createCompositeAndMixedIndex(management, Constants.STATE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
 
+            // create a composite index for entity state
+            createCompositeAndMixedIndex(management, Constants.TIMESTAMP_PROPERTY_KEY, Long.class, false, Cardinality.SINGLE, true);
+
+            // create a composite index for entity state
+            createCompositeAndMixedIndex(management, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class, false, Cardinality.SINGLE, true);
+
             // create a composite and mixed index for type since it can be combined with other keys
             createCompositeAndMixedIndex(management, Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE,
                     true);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 393863c..de48c15 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -301,6 +301,7 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-war-plugin</artifactId>
                 <configuration>
+                    <archiveClasses>true</archiveClasses>
                     <attachClasses>true</attachClasses>
                     <overlays>
                         <!-- <overlay>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java b/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java
new file mode 100644
index 0000000..c6ed85d
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.google.inject.Inject;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.atlas.web.filters.AuditFilter;
+import org.apache.atlas.web.resources.EntityResource;
+import org.apache.atlas.web.service.ServiceState;
+import org.apache.atlas.web.util.DateTimeHelper;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Local atlas client which calls the resource methods directly. Used by NotificationHookConsumer.
+ */
+public class LocalAtlasClient extends AtlasClient {
+    private static final String LOCALHOST = "localhost";
+    private static final String CLASS =  LocalAtlasClient.class.getSimpleName();
+
+    public static final Logger LOG = LoggerFactory.getLogger(LocalAtlasClient.class);
+
+    private final EntityResource entityResource;
+
+    private final ServiceState serviceState;
+
+    @Inject
+    public LocalAtlasClient(ServiceState serviceState, EntityResource entityResource) {
+        super();
+        this.serviceState = serviceState;
+        this.entityResource = entityResource;
+    }
+
+    private String user;
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    private void setRequestContext() {
+        RequestContext requestContext = RequestContext.createContext();
+        requestContext.setUser(user);
+    }
+
+    @Override
+    public boolean isServerReady() throws AtlasServiceException {
+        return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE;
+    }
+
+    @Override
+    protected List<String> createEntity(final JSONArray entities) throws AtlasServiceException {
+        LOG.debug("Creating entities: {}", entities);
+        EntityOperation entityOperation = new EntityOperation(API.CREATE_ENTITY) {
+            @Override
+            Response invoke() {
+                return entityResource.submit(new LocalServletRequest(entities.toString()));
+            }
+        };
+        JSONObject response = entityOperation.run();
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Create entities returned results: {}", results);
+        return results;
+    }
+
+    @Override
+    protected List<String> updateEntities(final JSONArray entities) throws AtlasServiceException {
+        LOG.debug("Updating entities: {}", entities);
+        EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY) {
+            @Override
+            Response invoke() {
+                return entityResource.updateEntities(new LocalServletRequest(entities.toString()));
+            }
+        };
+        JSONObject response = entityOperation.run();
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Update entities returned results: {}", results);
+        return results;
+    }
+
+    private abstract class EntityOperation {
+        private final API api;
+
+        public EntityOperation(API api) {
+            this.api = api;
+        }
+
+        public JSONObject run() throws AtlasServiceException {
+            setRequestContext();
+            AuditFilter.audit(user, CLASS, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
+
+            try {
+                Response response = invoke();
+                return (JSONObject) response.getEntity();
+            } catch(WebApplicationException e) {
+                try {
+                    throw new AtlasServiceException(api, e);
+                } catch (JSONException e1) {
+                    throw new AtlasServiceException(e);
+                }
+            }
+        }
+
+        abstract Response invoke();
+    }
+
+    @Override
+    public String updateEntity(final String entityType, final String uniqueAttributeName,
+                               final String uniqueAttributeValue, Referenceable entity) throws AtlasServiceException {
+        final String entityJson = InstanceSerialization.toJson(entity, true);
+        LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
+                uniqueAttributeName, uniqueAttributeValue, entityJson);
+        EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY_PARTIAL) {
+            @Override
+            Response invoke() {
+                return entityResource.updateByUniqueAttribute(entityType, uniqueAttributeName, uniqueAttributeValue,
+                        new LocalServletRequest(entityJson));
+            }
+        };
+        JSONObject response = entityOperation.run();
+        String result = getString(response, GUID);
+        LOG.debug("Update entity returned result: {}", result);
+        return result;
+    }
+
+    @Override
+    public List<String> deleteEntity(final String entityType, final String uniqueAttributeName,
+                                     final String uniqueAttributeValue) throws AtlasServiceException {
+        LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
+                uniqueAttributeValue);
+        EntityOperation entityOperation = new EntityOperation(API.DELETE_ENTITY) {
+            @Override
+            Response invoke() {
+                return entityResource.deleteEntities(null, entityType, uniqueAttributeName, uniqueAttributeValue);
+            }
+        };
+        JSONObject response = entityOperation.run();
+        List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+        LOG.debug("Delete entities returned results: {}", results);
+        return results;
+    }
+
+    @Override
+    public String getAdminStatus() throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<String> createType(String typeAsJson) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<String> updateType(String typeAsJson) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<String> listTypes() throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public TypesDef getType(String typeName) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public void updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+
+    @Override
+    public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public Referenceable getEntity(String guid) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public Referenceable getEntity(final String entityType, final String attribute, final String value)
+            throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<String> listEntities(final String entityType) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public List<EntityAuditEvent> getEntityAuditEvents(String entityId, String startKey, short numResults)
+            throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONArray search(final String searchQuery) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONArray searchByDSL(final String query) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONObject searchByFullText(final String query) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+
+    @Override
+    public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException {
+        throw new IllegalStateException("Not supported in LocalAtlasClient");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java b/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java
new file mode 100644
index 0000000..36a01b2
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.DispatcherType;
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpUpgradeHandler;
+import javax.servlet.http.Part;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+
+public class LocalServletRequest implements HttpServletRequest {
+    private final String payload;
+
+    LocalServletRequest(String payload) {
+        this.payload = payload;
+    }
+
+    public String getPayload() {
+        return payload;
+    }
+
+    @Override
+    public String getAuthType() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Cookie[] getCookies() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public long getDateHeader(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getHeader(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<String> getHeaders(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<String> getHeaderNames() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getIntHeader(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getMethod() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getPathInfo() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getPathTranslated() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getContextPath() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getQueryString() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRemoteUser() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isUserInRole(String role) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Principal getUserPrincipal() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRequestedSessionId() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRequestURI() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public StringBuffer getRequestURL() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getServletPath() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public HttpSession getSession(boolean create) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public HttpSession getSession() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String changeSessionId() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isRequestedSessionIdValid() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromCookie() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromURL() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isRequestedSessionIdFromUrl() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean authenticate(HttpServletResponse response) throws IOException, ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void login(String username, String password) throws ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void logout() throws ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Collection<Part> getParts() throws IOException, ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Part getPart(String name) throws IOException, ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public <T extends HttpUpgradeHandler> T upgrade(Class<T> handlerClass) throws IOException, ServletException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Object getAttribute(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<String> getAttributeNames() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getCharacterEncoding() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getContentLength() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public long getContentLengthLong() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getContentType() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public ServletInputStream getInputStream() throws IOException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getParameter(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<String> getParameterNames() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String[] getParameterValues(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Map<String, String[]> getParameterMap() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getProtocol() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getScheme() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getServerName() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getServerPort() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public BufferedReader getReader() throws IOException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRemoteAddr() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRemoteHost() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void setAttribute(String name, Object o) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public void removeAttribute(String name) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Locale getLocale() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public Enumeration<Locale> getLocales() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isSecure() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public RequestDispatcher getRequestDispatcher(String path) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getRealPath(String path) {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getRemotePort() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getLocalName() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public String getLocalAddr() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public int getLocalPort() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public ServletContext getServletContext() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public AsyncContext startAsync() throws IllegalStateException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse)
+            throws IllegalStateException {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isAsyncStarted() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public boolean isAsyncSupported() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public AsyncContext getAsyncContext() {
+        throw new IllegalStateException("Not supported");
+    }
+
+    @Override
+    public DispatcherType getDispatcherType() {
+        throw new IllegalStateException("Not supported");
+    }
+}