You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/05/06 07:38:58 UTC
[1/2] incubator-atlas git commit: ATLAS-585 NotificationHookConsumer
creates new AtlasClient for every message (shwethags)
Repository: incubator-atlas
Updated Branches:
refs/heads/master 334429a83 -> 1e3029bc7
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java b/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
index 79b8124..64e6e92 100755
--- a/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
+++ b/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
@@ -21,7 +21,6 @@ package org.apache.atlas.examples;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
@@ -112,9 +111,9 @@ public class QuickStart {
private final AtlasClient metadataServiceClient;
- QuickStart(String baseUrl) {
+ QuickStart(String baseUrl) throws AtlasException {
String[] urls = baseUrl.split(",");
- metadataServiceClient = new AtlasClient(null, null, urls);
+ metadataServiceClient = new AtlasClient(urls);
}
void createTypes() throws Exception {
@@ -292,11 +291,11 @@ public class QuickStart {
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
- JSONArray guids = metadataServiceClient.createEntity(entityJSON);
+ List<String> guids = metadataServiceClient.createEntity(entityJSON);
System.out.println("created instance for type " + typeName + ", guid: " + guids);
// return the Id for created instance with guid
- return new Id(guids.getString(guids.length()-1), referenceable.getId().getVersion(),
+ return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),
referenceable.getTypeName());
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
new file mode 100644
index 0000000..8ef2f64
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import kafka.consumer.ConsumerTimeoutException;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.LocalAtlasClient;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.service.Service;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Consumer of notifications from hooks e.g., hive hook etc.
+ */
+@Singleton
+public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
+ private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
+
+ public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
+ public static final int SERVER_READY_WAIT_TIME_MS = 1000;
+ private final LocalAtlasClient atlasClient;
+
+ private NotificationInterface notificationInterface;
+ private ExecutorService executors;
+ private Configuration applicationProperties;
+ private List<HookConsumer> consumers;
+
+ @Inject
+ public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) {
+ this.notificationInterface = notificationInterface;
+ this.atlasClient = atlasClient;
+ }
+
+ @Override
+ public void start() throws AtlasException {
+ Configuration configuration = ApplicationProperties.get();
+ startInternal(configuration, null);
+ }
+
+ void startInternal(Configuration configuration,
+ ExecutorService executorService) {
+ this.applicationProperties = configuration;
+ if (consumers == null) {
+ consumers = new ArrayList<>();
+ }
+ if (executorService != null) {
+ executors = executorService;
+ }
+ if (!HAConfiguration.isHAEnabled(configuration)) {
+ LOG.info("HA is disabled, starting consumers inline.");
+ startConsumers(executorService);
+ }
+ }
+
+ private void startConsumers(ExecutorService executorService) {
+ int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
+ List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
+ notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
+ if (executorService == null) {
+ executorService = Executors.newFixedThreadPool(notificationConsumers.size(),
+ new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
+ }
+ executors = executorService;
+ for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
+ HookConsumer hookConsumer = new HookConsumer(consumer);
+ consumers.add(hookConsumer);
+ executors.submit(hookConsumer);
+ }
+ }
+
+ @Override
+ public void stop() {
+ //Allow for completion of outstanding work
+ notificationInterface.close();
+ try {
+ if (executors != null) {
+ stopConsumerThreads();
+ executors.shutdownNow();
+ if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+ LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+ }
+ executors = null;
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Failure in shutting down consumers");
+ }
+ }
+
+ private void stopConsumerThreads() {
+ if (consumers != null) {
+ for (HookConsumer consumer : consumers) {
+ consumer.stop();
+ }
+ consumers.clear();
+ }
+ }
+
+ /**
+ * Start Kafka consumer threads that read from Kafka topic when server is activated.
+ *
+ * Since the consumers create / update entities to the shared backend store, only the active instance
+ * should perform this activity. Hence, these threads are started only on server activation.
+ */
+ @Override
+ public void instanceIsActive() {
+ LOG.info("Reacting to active state: initializing Kafka consumers");
+ startConsumers(executors);
+ }
+
+ /**
+ * Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
+ *
+ * Since the consumers create / update entities to the shared backend store, only the active instance
+ * should perform this activity. Hence, these threads are stopped only on server deactivation.
+ */
+ @Override
+ public void instanceIsPassive() {
+ LOG.info("Reacting to passive state: shutting down Kafka consumers.");
+ stop();
+ }
+
+ static class Timer {
+ public void sleep(int interval) throws InterruptedException {
+ Thread.sleep(interval);
+ }
+ }
+
+ class HookConsumer implements Runnable {
+ private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
+ private final AtomicBoolean shouldRun = new AtomicBoolean(false);
+
+ public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+ this.consumer = consumer;
+ }
+
+ private boolean hasNext() {
+ try {
+ return consumer.hasNext();
+ } catch (ConsumerTimeoutException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void run() {
+ shouldRun.set(true);
+
+ if (!serverAvailable(new NotificationHookConsumer.Timer())) {
+ return;
+ }
+
+ while (shouldRun.get()) {
+ try {
+ if (hasNext()) {
+ HookNotification.HookNotificationMessage message = consumer.next();
+ atlasClient.setUser(message.getUser());
+ try {
+ switch (message.getType()) {
+ case ENTITY_CREATE:
+ HookNotification.EntityCreateRequest createRequest =
+ (HookNotification.EntityCreateRequest) message;
+ atlasClient.createEntity(createRequest.getEntities());
+ break;
+
+ case ENTITY_PARTIAL_UPDATE:
+ HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
+ (HookNotification.EntityPartialUpdateRequest) message;
+ atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+ partialUpdateRequest.getAttribute(),
+ partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
+ break;
+
+ case ENTITY_DELETE:
+ HookNotification.EntityDeleteRequest deleteRequest =
+ (HookNotification.EntityDeleteRequest) message;
+ atlasClient.deleteEntity(deleteRequest.getTypeName(),
+ deleteRequest.getAttribute(),
+ deleteRequest.getAttributeValue());
+ break;
+
+ case ENTITY_FULL_UPDATE:
+ HookNotification.EntityUpdateRequest updateRequest =
+ (HookNotification.EntityUpdateRequest) message;
+ atlasClient.updateEntities(updateRequest.getEntities());
+ break;
+
+ default:
+ throw new IllegalStateException("Unhandled exception!");
+ }
+ } catch (Exception e) {
+ //todo handle failures
+ LOG.warn("Error handling message {}", message, e);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn("Failure in NotificationHookConsumer", t);
+ }
+ }
+ }
+
+ boolean serverAvailable(Timer timer) {
+ try {
+ while (!atlasClient.isServerReady()) {
+ try {
+ LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
+ SERVER_READY_WAIT_TIME_MS);
+ timer.sleep(SERVER_READY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for Atlas Server to become ready, "
+ + "exiting consumer thread.", e);
+ return false;
+ }
+ }
+ } catch (Throwable e) {
+ LOG.info(
+ "Handled AtlasServiceException while waiting for Atlas Server to become ready, "
+ + "exiting consumer thread.", e);
+ return false;
+ }
+ LOG.info("Atlas Server is ready, can start reading Kafka events.");
+ return true;
+ }
+
+ public void stop() {
+ shouldRun.set(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
index 01b1cd3..2d84b10 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
@@ -25,7 +25,6 @@ import org.apache.atlas.security.SecurityProperties;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
@@ -57,10 +56,6 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasAuthenticationFilter.class);
static final String PREFIX = "atlas.http.authentication";
- /**
- * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
- * before invoking the actual resource.
- */
private HttpServlet optionsServlet;
/**
@@ -128,47 +123,45 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
@Override
public void doFilter(final ServletRequest request, final ServletResponse response,
final FilterChain filterChain) throws IOException, ServletException {
-
FilterChain filterChainWrapper = new FilterChain() {
-
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
throws IOException, ServletException {
- HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
+ final HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
- if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
+ if (httpRequest.getMethod().equals("OPTIONS")) {
optionsServlet.service(request, response);
+
} else {
- final String user = Servlets.getUserFromRequest(httpRequest);
- if (StringUtils.isEmpty(user)) {
- ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
- "Param user.name can't be empty");
- } else {
- try {
- NDC.push(user + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
- RequestContext requestContext = RequestContext.get();
- requestContext.setUser(user);
- LOG.info("Request from authenticated user: {}, URL={}", user,
- Servlets.getRequestURI(httpRequest));
-
- filterChain.doFilter(servletRequest, servletResponse);
- } finally {
- NDC.pop();
- }
+ try {
+ String requestUser = httpRequest.getRemoteUser();
+ NDC.push(requestUser + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
+ RequestContext requestContext = RequestContext.get();
+ requestContext.setUser(requestUser);
+ LOG.info("Request from authenticated user: {}, URL={}", requestUser,
+ Servlets.getRequestURI(httpRequest));
+
+ filterChain.doFilter(servletRequest, servletResponse);
+ } finally {
+ NDC.pop();
}
}
}
};
- super.doFilter(request, response, filterChainWrapper);
+ try {
+ super.doFilter(request, response, filterChainWrapper);
+ } catch (NullPointerException e) {
+ //PseudoAuthenticationHandler.getUserName() from hadoop-auth throws NPE if user name is not specified
+ ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+ "Authentication is enabled and user is not specified. Specify user.name parameter");
+ }
}
+
@Override
public void destroy() {
- if (optionsServlet != null) {
- optionsServlet.destroy();
- }
-
+ optionsServlet.destroy();
super.destroy();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index 9d60e1a..eeaddd6 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -99,7 +99,7 @@ public class AuditFilter implements Filter {
return userFromRequest == null ? "UNKNOWN" : userFromRequest;
}
- private void audit(String who, String fromAddress, String whatRequest, String fromHost, String whatURL, String whatAddrs,
+ public static void audit(String who, String fromAddress, String whatRequest, String fromHost, String whatURL, String whatAddrs,
String whenISO9601) {
AUDIT_LOG.info("Audit: {}/{}-{} performed request {} {} ({}) at time {}", who, fromAddress, fromHost, whatRequest, whatURL,
whatAddrs, whenISO9601);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index 1eca174..010fa2a 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -58,7 +58,6 @@ public class GuiceServletConfig extends GuiceServletContextListener {
private static final Logger LOG = LoggerFactory.getLogger(GuiceServletConfig.class);
private static final String GUICE_CTX_PARAM = "guice.packages";
- static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
protected volatile Injector injector;
@Override
@@ -126,7 +125,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
if (configuration == null) {
throw new ConfigurationException("Could not load application configuration");
}
- if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) {
+ if (Boolean.valueOf(configuration.getString(AtlasClient.HTTP_AUTHENTICATION_ENABLED))) {
LOG.info("Enabling AuthenticationFilter");
filter("/*").through(AtlasAuthenticationFilter.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 6068007..36b7607 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -19,6 +19,7 @@
package org.apache.atlas.web.resources;
import com.google.inject.Inject;
+import org.apache.atlas.AtlasClient;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.ConfigurationException;
@@ -113,7 +114,7 @@ public class AdminResource {
public Response getStatus() {
JSONObject responseData = new JSONObject();
try {
- responseData.put("Status", serviceState.getState().toString());
+ responseData.put(AtlasClient.STATUS, serviceState.getState().toString());
Response response = Response.ok(responseData).build();
return response;
} catch (JSONException e) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
index b14aa80..709fec5 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
@@ -330,7 +330,6 @@ public class EntityResource {
* @param entityType the entity type
* @param attribute the unique attribute used to identify the entity
* @param value the unique attribute value used to identify the entity
- * @param request - Ignored
* @return response payload as json - including guids of entities(including composite references from that entity) that were deleted
*/
@DELETE
@@ -338,8 +337,7 @@ public class EntityResource {
public Response deleteEntities(@QueryParam("guid") List<String> guids,
@QueryParam("type") String entityType,
@QueryParam("property") String attribute,
- @QueryParam("value") String value,
- @Context HttpServletRequest request) {
+ @QueryParam("value") String value) {
try {
List<String> deletedGuids = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java b/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java
index 480a232..b4f0839 100755
--- a/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java
+++ b/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java
@@ -19,10 +19,13 @@
package org.apache.atlas.web.util;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.LocalServletRequest;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
@@ -34,6 +37,8 @@ import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.List;
/**
* Utility functions for dealing with servlets.
@@ -70,6 +75,28 @@ public final class Servlets {
return user;
}
+ user = getDoAsUser(httpRequest);
+ if (!StringUtils.isEmpty(user)) {
+ return user;
+ }
+
+ return null;
+ }
+
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+ private static final String DO_AS = "doAs";
+
+ public static String getDoAsUser(HttpServletRequest request) {
+ if (StringUtils.isNoneEmpty(request.getQueryString())) {
+ List<NameValuePair> list = URLEncodedUtils.parse(request.getQueryString(), UTF8_CHARSET);
+ if (list != null) {
+ for (NameValuePair nv : list) {
+ if (DO_AS.equals(nv.getName())) {
+ return nv.getValue();
+ }
+ }
+ }
+ }
return null;
}
@@ -134,6 +161,11 @@ public final class Servlets {
}
public static String getRequestPayload(HttpServletRequest request) throws IOException {
+ //request is an instance of LocalServletRequest for calls from LocalAtlasClient
+ if (request instanceof LocalServletRequest) {
+ return ((LocalServletRequest) request).getPayload();
+ }
+
StringWriter writer = new StringWriter();
IOUtils.copy(request.getInputStream(), writer);
return writer.toString();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java b/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java
new file mode 100644
index 0000000..7f20652
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.web.resources.EntityResource;
+import org.apache.atlas.web.service.ServiceState;
+import org.apache.commons.lang.RandomStringUtils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class LocalAtlasClientTest {
+ @Mock
+ private EntityResource entityResource;
+
+ @Mock
+ private ServiceState serviceState;
+
+ @BeforeMethod
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testCreateEntity() throws Exception {
+ Response response = mock(Response.class);
+ when(entityResource.submit(any(HttpServletRequest.class))).thenReturn(response);
+ final String guid = random();
+ when(response.getEntity()).thenReturn(new JSONObject() {{
+ put(AtlasClient.GUID, new JSONArray(Arrays.asList(guid)));
+ }});
+
+ LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+ List<String> results = atlasClient.createEntity(new Referenceable(random()));
+ assertEquals(results.size(), 1);
+ assertEquals(results.get(0), guid);
+ }
+
+ @Test
+ public void testException() throws Exception {
+ LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+
+ Response response = mock(Response.class);
+ when(entityResource.submit(any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response));
+ when(response.getEntity()).thenReturn(new JSONObject() {{
+ put("stackTrace", "stackTrace");
+ }});
+ when(response.getStatus()).thenReturn(Response.Status.BAD_REQUEST.getStatusCode());
+ try {
+ atlasClient.createEntity(new Referenceable(random()));
+ fail("Expected AtlasServiceException");
+ } catch(AtlasServiceException e) {
+ assertEquals(e.getStatus(), ClientResponse.Status.BAD_REQUEST);
+ }
+
+ when(entityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(),
+ any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response));
+ when(response.getStatus()).thenReturn(Response.Status.NOT_FOUND.getStatusCode());
+ try {
+ atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
+ fail("Expected AtlasServiceException");
+ } catch(AtlasServiceException e) {
+ assertEquals(e.getStatus(), ClientResponse.Status.NOT_FOUND);
+ }
+
+ }
+
+ @Test
+ public void testIsServerReady() throws Exception {
+ when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
+ LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+ assertTrue(atlasClient.isServerReady());
+
+ when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE);
+ assertFalse(atlasClient.isServerReady());
+ }
+
+ @Test
+ public void testUpdateEntity() throws Exception {
+ final String guid = random();
+ Response response = mock(Response.class);
+ when(entityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(),
+ any(HttpServletRequest.class))).thenReturn(response);
+ when(response.getEntity()).thenReturn(new JSONObject() {{
+ put(AtlasClient.GUID, guid);
+ }});
+
+ LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+ String actualId = atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
+ assertEquals(actualId, guid);
+ }
+
+ @Test
+ public void testDeleteEntity() throws Exception {
+ final String guid = random();
+ Response response = mock(Response.class);
+ when(response.getEntity()).thenReturn(new JSONObject() {{
+ put(AtlasClient.GUID, new JSONArray(Arrays.asList(guid)));
+ }});
+
+ when(entityResource.deleteEntities(anyListOf(String.class), anyString(), anyString(), anyString())).thenReturn(response);
+ LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
+ List<String> results = atlasClient.deleteEntity(random(), random(), random());
+ assertEquals(results.size(), 1);
+ assertEquals(results.get(0), guid);
+ }
+
+ private String random() {
+ return RandomStringUtils.randomAlphanumeric(10);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index 03a0d3f..72f403e 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -19,6 +19,7 @@
package org.apache.atlas.notification;
import com.google.inject.Inject;
+import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
@@ -29,6 +30,8 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
+import java.util.List;
+
import static org.testng.Assert.assertEquals;
@Guice(modules = NotificationModule.class)
@@ -55,6 +58,28 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
}
@Test
+ public void testMessageHandleFailureConsumerContinues() throws Exception {
+ //send invalid message - update with invalid type
+ sendHookMessage(new HookNotification.EntityPartialUpdateRequest(TEST_USER, randomString(), null, null,
+ new Referenceable(randomString())));
+
+ //send valid message
+ final Referenceable entity = new Referenceable(DATABASE_TYPE);
+ entity.set("name", "db" + randomString());
+ entity.set("description", randomString());
+ sendHookMessage(new HookNotification.EntityCreateRequest(TEST_USER, entity));
+
+ waitFor(MAX_WAIT_TIME, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
+ entity.get("name")));
+ return results.length() == 1;
+ }
+ });
+ }
+
+ @Test
public void testCreateEntity() throws Exception {
final Referenceable entity = new Referenceable(DATABASE_TYPE);
entity.set("name", "db" + randomString());
@@ -70,6 +95,13 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
return results.length() == 1;
}
});
+
+ //Assert that user passed in hook message is used in audit
+ Referenceable instance = serviceClient.getEntity(DATABASE_TYPE, "name", (String) entity.get("name"));
+ List<EntityAuditEvent> events =
+ serviceClient.getEntityAuditEvents(instance.getId()._getId(), (short) 1);
+ assertEquals(events.size(), 1);
+ assertEquals(events.get(0).getUser(), TEST_USER);
}
@Test
@@ -132,7 +164,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
final String dbName = "db" + randomString();
entity.set("name", dbName);
entity.set("description", randomString());
- final String dbId = serviceClient.createEntity(entity).getString(0);
+ final String dbId = serviceClient.createEntity(entity).get(0);
sendHookMessage(
new HookNotification.EntityDeleteRequest(TEST_USER, DATABASE_TYPE, "name", dbName));
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
new file mode 100644
index 0000000..8765826
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.LocalAtlasClient;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class NotificationHookConsumerTest {
+
+ @Mock
+ private NotificationInterface notificationInterface;
+
+ @Mock
+ private LocalAtlasClient atlasClient;
+
+ @Mock
+ private Configuration configuration;
+
+ @Mock
+ private ExecutorService executorService;
+
+ @BeforeMethod
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+ NotificationHookConsumer.HookConsumer hookConsumer =
+ notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ when(atlasClient.isServerReady()).thenReturn(true);
+
+ assertTrue(hookConsumer.serverAvailable(timer));
+
+ verifyZeroInteractions(timer);
+ }
+
+ @Test
+ public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+ NotificationHookConsumer.HookConsumer hookConsumer =
+ notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
+
+ assertTrue(hookConsumer.serverAvailable(timer));
+
+ verify(timer, times(3)).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
+ }
+
+ @Test
+ public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+ NotificationHookConsumer.HookConsumer hookConsumer =
+ notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
+ when(atlasClient.isServerReady()).thenReturn(false);
+
+ assertFalse(hookConsumer.serverAvailable(timer));
+ }
+
+ @Test
+ public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+ NotificationHookConsumer.HookConsumer hookConsumer =
+ notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
+ new Exception()));
+
+ assertFalse(hookConsumer.serverAvailable(timer));
+ }
+
+ @Test
+ public void testConsumersStartedIfHAIsDisabled() {
+ when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
+ when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ consumers.add(mock(NotificationConsumer.class));
+ when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+ thenReturn(consumers);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+ notificationHookConsumer.startInternal(configuration, executorService);
+ verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+ verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+ }
+
+ @Test
+ public void testConsumersAreNotStartedIfHAIsEnabled() {
+ when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+ when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ consumers.add(mock(NotificationConsumer.class));
+ when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+ thenReturn(consumers);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+ notificationHookConsumer.startInternal(configuration, executorService);
+ verifyZeroInteractions(notificationInterface);
+ }
+
+ @Test
+ public void testConsumersAreStartedWhenInstanceBecomesActive() {
+ when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+ when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ consumers.add(mock(NotificationConsumer.class));
+ when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+ thenReturn(consumers);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+ notificationHookConsumer.startInternal(configuration, executorService);
+ notificationHookConsumer.instanceIsActive();
+ verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+ verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+ }
+
+ @Test
+ public void testConsumersAreStoppedWhenInstanceBecomesPassive() {
+ when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+ when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+ List<NotificationConsumer<Object>> consumers = new ArrayList();
+ consumers.add(mock(NotificationConsumer.class));
+ when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
+ thenReturn(consumers);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
+ notificationHookConsumer.startInternal(configuration, executorService);
+ notificationHookConsumer.instanceIsPassive();
+ verify(notificationInterface).close();
+ verify(executorService).shutdownNow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java
deleted file mode 100644
index 9e1e08f..0000000
--- a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.web.filters;
-
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.web.security.BaseSecurityTest;
-import org.apache.atlas.web.service.EmbeddedServer;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hdfs.web.URLConnectionFactory;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.webapp.WebAppContext;
-import org.testng.annotations.Test;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-import java.io.File;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
-import java.util.Properties;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- *
- */
-public class AtlasAuthenticationKerberosFilterIT extends BaseSecurityTest {
- public static final String TEST_USER_JAAS_SECTION = "TestUser";
- public static final String TESTUSER = "testuser";
- public static final String TESTPASS = "testpass";
-
- private File userKeytabFile;
- private File httpKeytabFile;
-
- class TestEmbeddedServer extends EmbeddedServer {
- public TestEmbeddedServer(int port, String path) throws IOException {
- super(port, path);
- }
-
- Server getServer() {
- return server;
- }
-
- @Override
- protected WebAppContext getWebAppContext(String path) {
- WebAppContext application = new WebAppContext(path, "/");
- application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
- application.setClassLoader(Thread.currentThread().getContextClassLoader());
- return application;
- }
- }
-
- @Test(enabled = false)
- public void testKerberosBasedLogin() throws Exception {
- String originalConf = System.getProperty("atlas.conf");
- System.setProperty("atlas.conf", System.getProperty("user.dir"));
-
- setupKDCAndPrincipals();
- TestEmbeddedServer server = null;
-
- try {
- // setup the atlas-application.properties file
- generateKerberosTestProperties();
-
- // need to create the web application programmatically in order to control the injection of the test
- // application properties
- server = new TestEmbeddedServer(23000, "webapp/target/apache-atlas");
-
- startEmbeddedServer(server.getServer());
-
- final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
- // attempt to hit server and get rejected
- URL url = new URL("http://localhost:23000/");
- HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, false);
- connection.setRequestMethod("GET");
- connection.connect();
-
- assertEquals(connection.getResponseCode(), 401);
-
- // need to populate the ticket cache with a local user, so logging in...
- Subject subject = loginTestUser();
-
- Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- // attempt to hit server and get rejected
- URL url = new URL("http://localhost:23000/");
- HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, true);
- connection.setRequestMethod("GET");
- connection.connect();
-
- assertEquals(connection.getResponseCode(), 200);
- assertEquals(RequestContext.get().getUser(), TESTUSER);
- return null;
- }
- });
- } finally {
- server.getServer().stop();
- kdc.stop();
-
- if (originalConf != null) {
- System.setProperty("atlas.conf", originalConf);
- } else {
- System.clearProperty("atlas.conf");
- }
-
- }
-
-
- }
-
- protected Subject loginTestUser() throws LoginException, IOException {
- LoginContext lc = new LoginContext(TEST_USER_JAAS_SECTION, new CallbackHandler() {
-
- @Override
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (int i = 0; i < callbacks.length; i++) {
- if (callbacks[i] instanceof PasswordCallback) {
- PasswordCallback passwordCallback = (PasswordCallback) callbacks[i];
- passwordCallback.setPassword(TESTPASS.toCharArray());
- }
- if (callbacks[i] instanceof NameCallback) {
- NameCallback nameCallback = (NameCallback) callbacks[i];
- nameCallback.setName(TESTUSER);
- }
- }
- }
- });
- // attempt authentication
- lc.login();
- return lc.getSubject();
- }
-
- protected void generateKerberosTestProperties() throws IOException, ConfigurationException {
- Properties props = new Properties();
- props.setProperty("atlas.http.authentication.enabled", "true");
- props.setProperty("atlas.http.authentication.type", "kerberos");
- props.setProperty("atlas.http.authentication.kerberos.principal", "HTTP/localhost@" + kdc.getRealm());
- props.setProperty("atlas.http.authentication.kerberos.keytab", httpKeytabFile.getAbsolutePath());
- props.setProperty("atlas.http.authentication.kerberos.name.rules",
- "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT");
-
- generateTestProperties(props);
- }
-
- public void setupKDCAndPrincipals() throws Exception {
- // set up the KDC
- File kdcWorkDir = startKDC();
-
- userKeytabFile = createKeytab(kdc, kdcWorkDir, "dgi", "dgi.keytab");
- httpKeytabFile = createKeytab(kdc, kdcWorkDir, "HTTP", "spnego.service.keytab");
-
- // create a test user principal
- kdc.createPrincipal(TESTUSER, TESTPASS);
-
- StringBuilder jaas = new StringBuilder(1024);
- jaas.append("TestUser {\n" +
- " com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" +
- "};\n");
- jaas.append(createJAASEntry("Client", "dgi", userKeytabFile));
- jaas.append(createJAASEntry("Server", "HTTP", httpKeytabFile));
-
- File jaasFile = new File(kdcWorkDir, "jaas.txt");
- FileUtils.write(jaasFile, jaas.toString());
- bindJVMtoJAASFile(jaasFile);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterTest.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterTest.java
new file mode 100644
index 0000000..f85892a
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.filters;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.web.security.BaseSecurityTest;
+import org.apache.atlas.web.service.EmbeddedServer;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.testng.annotations.Test;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ *
+ */
+public class AtlasAuthenticationKerberosFilterTest extends BaseSecurityTest {
+ public static final String TEST_USER_JAAS_SECTION = "TestUser";
+ public static final String TESTUSER = "testuser";
+ public static final String TESTPASS = "testpass";
+
+ private File userKeytabFile;
+ private File httpKeytabFile;
+
+ class TestEmbeddedServer extends EmbeddedServer {
+ public TestEmbeddedServer(int port, String path) throws IOException {
+ super(port, path);
+ }
+
+ Server getServer() {
+ return server;
+ }
+
+ @Override
+ protected WebAppContext getWebAppContext(String path) {
+ WebAppContext application = new WebAppContext(path, "/");
+ application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
+ application.setClassLoader(Thread.currentThread().getContextClassLoader());
+ return application;
+ }
+ }
+
+ @Test(enabled = false)
+ public void testKerberosBasedLogin() throws Exception {
+ String originalConf = System.getProperty("atlas.conf");
+
+ setupKDCAndPrincipals();
+ TestEmbeddedServer server = null;
+
+ try {
+ // setup the atlas-application.properties file
+ String confDirectory = generateKerberosTestProperties();
+ System.setProperty("atlas.conf", confDirectory);
+
+ // need to create the web application programmatically in order to control the injection of the test
+ // application properties
+ server = new TestEmbeddedServer(23000, "webapp/target/apache-atlas");
+
+ startEmbeddedServer(server.getServer());
+
+ final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
+ // attempt to hit server and get rejected
+ URL url = new URL("http://localhost:23000/");
+ HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, false);
+ connection.setRequestMethod("GET");
+ connection.connect();
+
+ assertEquals(connection.getResponseCode(), 401);
+
+ // need to populate the ticket cache with a local user, so logging in...
+ Subject subject = loginTestUser();
+
+ Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ // attempt to hit server and get rejected
+ URL url = new URL("http://localhost:23000/");
+ HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, true);
+ connection.setRequestMethod("GET");
+ connection.connect();
+
+ assertEquals(connection.getResponseCode(), 200);
+ assertEquals(RequestContext.get().getUser(), TESTUSER);
+ return null;
+ }
+ });
+ } finally {
+ server.getServer().stop();
+ kdc.stop();
+
+ if (originalConf != null) {
+ System.setProperty("atlas.conf", originalConf);
+ } else {
+ System.clearProperty("atlas.conf");
+ }
+
+ }
+ }
+
+ protected Subject loginTestUser() throws LoginException, IOException {
+ LoginContext lc = new LoginContext(TEST_USER_JAAS_SECTION, new CallbackHandler() {
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (int i = 0; i < callbacks.length; i++) {
+ if (callbacks[i] instanceof PasswordCallback) {
+ PasswordCallback passwordCallback = (PasswordCallback) callbacks[i];
+ passwordCallback.setPassword(TESTPASS.toCharArray());
+ }
+ if (callbacks[i] instanceof NameCallback) {
+ NameCallback nameCallback = (NameCallback) callbacks[i];
+ nameCallback.setName(TESTUSER);
+ }
+ }
+ }
+ });
+ // attempt authentication
+ lc.login();
+ return lc.getSubject();
+ }
+
+ protected String generateKerberosTestProperties() throws Exception {
+ PropertiesConfiguration props = new PropertiesConfiguration();
+ props.setProperty("atlas.http.authentication.enabled", "true");
+ props.setProperty("atlas.http.authentication.type", "kerberos");
+ props.setProperty("atlas.http.authentication.kerberos.principal", "HTTP/localhost@" + kdc.getRealm());
+ props.setProperty("atlas.http.authentication.kerberos.keytab", httpKeytabFile.getAbsolutePath());
+ props.setProperty("atlas.http.authentication.kerberos.name.rules",
+ "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT");
+
+ return writeConfiguration(props);
+ }
+
+ public void setupKDCAndPrincipals() throws Exception {
+ // set up the KDC
+ File kdcWorkDir = startKDC();
+
+ userKeytabFile = createKeytab(kdc, kdcWorkDir, "dgi", "dgi.keytab");
+ httpKeytabFile = createKeytab(kdc, kdcWorkDir, "HTTP", "spnego.service.keytab");
+
+ // create a test user principal
+ kdc.createPrincipal(TESTUSER, TESTPASS);
+
+ StringBuilder jaas = new StringBuilder(1024);
+ jaas.append("TestUser {\n" +
+ " com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" +
+ "};\n");
+ jaas.append(createJAASEntry("Client", "dgi", userKeytabFile));
+ jaas.append(createJAASEntry("Server", "HTTP", httpKeytabFile));
+
+ File jaasFile = new File(kdcWorkDir, "jaas.txt");
+ FileUtils.write(jaasFile, jaas.toString());
+ bindJVMtoJAASFile(jaasFile);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java
deleted file mode 100644
index ca53096..0000000
--- a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.web.filters;
-
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.web.security.BaseSecurityTest;
-import org.apache.atlas.web.service.EmbeddedServer;
-import org.apache.commons.configuration.ConfigurationException;
-import org.eclipse.jetty.server.Server;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Properties;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- *
- */
-public class AtlasAuthenticationSimpleFilterIT extends BaseSecurityTest {
- public static final String TESTUSER = "testuser";
-
- class TestEmbeddedServer extends EmbeddedServer {
- public TestEmbeddedServer(int port, String path) throws IOException {
- super(port, path);
- }
-
- Server getServer() {
- return server;
- }
- }
-
- @Test(enabled = false)
- public void testSimpleLogin() throws Exception {
- String originalConf = System.getProperty("atlas.conf");
- System.setProperty("atlas.conf", System.getProperty("user.dir"));
- generateSimpleLoginConfiguration();
-
- TestEmbeddedServer server = new TestEmbeddedServer(23001, "webapp/target/apache-atlas");
-
- try {
- startEmbeddedServer(server.getServer());
-
- URL url = new URL("http://localhost:23001");
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- connection.setRequestMethod("GET");
- connection.connect();
-
- try {
- assertEquals(connection.getResponseCode(), 403);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- url = new URL("http://localhost:23001/?user.name=testuser");
- connection = (HttpURLConnection) url.openConnection();
- connection.setRequestMethod("GET");
- connection.connect();
-
- assertEquals(connection.getResponseCode(), 200);
- assertEquals(RequestContext.get().getUser(), TESTUSER);
- } finally {
- server.getServer().stop();
- if (originalConf != null) {
- System.setProperty("atlas.conf", originalConf);
- } else {
- System.clearProperty("atlas.conf");
- }
- }
-
-
- }
-
- protected void generateSimpleLoginConfiguration() throws IOException, ConfigurationException {
- Properties config = new Properties();
- config.setProperty("atlas.http.authentication.enabled", "true");
- config.setProperty("atlas.http.authentication.type", "simple");
-
- generateTestProperties(config);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterTest.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterTest.java
new file mode 100644
index 0000000..389eefe
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.filters;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.web.security.BaseSecurityTest;
+import org.apache.atlas.web.service.EmbeddedServer;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.eclipse.jetty.server.Server;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ *
+ */
+public class AtlasAuthenticationSimpleFilterTest extends BaseSecurityTest {
+ public static final String TESTUSER = "testuser";
+
+ class TestEmbeddedServer extends EmbeddedServer {
+ public TestEmbeddedServer(int port, String path) throws IOException {
+ super(port, path);
+ }
+
+ Server getServer() {
+ return server;
+ }
+ }
+
+ @Test(enabled = false)
+ public void testSimpleLogin() throws Exception {
+ String originalConf = System.getProperty("atlas.conf");
+ System.setProperty("atlas.conf", System.getProperty("user.dir"));
+ generateSimpleLoginConfiguration();
+
+ TestEmbeddedServer server = new TestEmbeddedServer(23001, "webapp/target/apache-atlas");
+
+ try {
+ startEmbeddedServer(server.getServer());
+
+ URL url = new URL("http://localhost:23001");
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.connect();
+ assertEquals(connection.getResponseCode(), Response.Status.BAD_REQUEST.getStatusCode());
+
+ url = new URL("http://localhost:23001/?user.name=testuser");
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.connect();
+
+ assertEquals(connection.getResponseCode(), Response.Status.OK.getStatusCode());
+ assertEquals(RequestContext.get().getUser(), TESTUSER);
+ } finally {
+ server.getServer().stop();
+ if (originalConf != null) {
+ System.setProperty("atlas.conf", originalConf);
+ } else {
+ System.clearProperty("atlas.conf");
+ }
+ }
+ }
+
+ protected String generateSimpleLoginConfiguration() throws Exception {
+ PropertiesConfiguration config = new PropertiesConfiguration();
+ config.setProperty("atlas.http.authentication.enabled", "true");
+ config.setProperty("atlas.http.authentication.type", "simple");
+ return writeConfiguration(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
index 54d8d92..ab3aa23 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
@@ -25,9 +25,7 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
-
import kafka.consumer.ConsumerTimeoutException;
-
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
@@ -55,7 +53,6 @@ import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
-import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +62,6 @@ import org.testng.annotations.BeforeClass;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
-
import java.util.List;
/**
@@ -78,7 +74,8 @@ public abstract class BaseResourceIT {
protected WebResource service;
protected AtlasClient serviceClient;
public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class);
- protected static final int MAX_WAIT_TIME = 1000;
+ protected static final int MAX_WAIT_TIME = 60000;
+ protected String baseUrl;
@BeforeClass
public void setUp() throws Exception {
@@ -86,7 +83,7 @@ public abstract class BaseResourceIT {
DefaultClientConfig config = new DefaultClientConfig();
Client client = Client.create(config);
Configuration configuration = ApplicationProperties.get();
- String baseUrl = configuration.getString(ATLAS_REST_ADDRESS, "http://localhost:21000/");
+ baseUrl = configuration.getString(ATLAS_REST_ADDRESS, "http://localhost:21000/");
client.resource(UriBuilder.fromUri(baseUrl).build());
service = client.resource(UriBuilder.fromUri(baseUrl).build());
@@ -126,12 +123,12 @@ public abstract class BaseResourceIT {
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
- JSONArray guids = serviceClient.createEntity(entityJSON);
+ List<String> guids = serviceClient.createEntity(entityJSON);
System.out.println("created instance for type " + typeName + ", guid: " + guids);
// return the reference to created instance with guid
- if (guids.length() > 0) {
- return new Id(guids.getString(guids.length() - 1), 0, referenceable.getTypeName());
+ if (guids.size() > 0) {
+ return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
}
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
index 720ce79..aa92bc0 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
@@ -25,7 +25,6 @@ import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
-
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.EntityAuditEvent;
@@ -51,6 +50,7 @@ import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -64,7 +64,6 @@ import org.testng.annotations.Test;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -122,6 +121,22 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
}
@Test
+ public void testRequestUser() throws Exception {
+ Referenceable entity = new Referenceable(DATABASE_TYPE);
+ entity.set("name", randomString());
+ entity.set("description", randomString());
+
+ String user = "testuser";
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+ AtlasClient localClient = new AtlasClient(ugi, null, baseUrl);
+ String entityId = localClient.createEntity(entity).get(0);
+
+ List<EntityAuditEvent> events = serviceClient.getEntityAuditEvents(entityId, (short) 10);
+ assertEquals(events.size(), 1);
+ assertEquals(events.get(0).getUser(), user);
+ }
+
+ @Test
//API should accept single entity (or jsonarray of entities)
public void testSubmitSingleEntity() throws Exception {
Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
@@ -149,7 +164,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
db.set("name", dbName);
db.set("description", randomString());
- final String dbid = serviceClient.createEntity(db).getString(0);
+ final String dbid = serviceClient.createEntity(db).get(0);
assertEntityAudit(dbid, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@@ -164,8 +179,8 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
- results = serviceClient.createEntity(db);
- assertEquals(results.length(), 0);
+ List<String> entityResults = serviceClient.createEntity(db);
+ assertEquals(entityResults.size(), 0);
try {
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@Override
@@ -214,7 +229,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
//create entity for the type
Referenceable instance = new Referenceable(typeDefinition.typeName);
instance.set("name", randomString());
- String guid = serviceClient.createEntity(instance).getString(0);
+ String guid = serviceClient.createEntity(instance).get(0);
//update type - add attribute
typeDefinition = TypesUtil.createClassTypeDef(typeDefinition.typeName, ImmutableSet.<String>of(),
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java b/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
index d497230..3d1a63a 100644
--- a/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
@@ -33,6 +33,7 @@ import java.io.IOException;
*
*/
public class BaseSSLAndKerberosTest extends BaseSecurityTest {
+ public static final String TEST_USER_JAAS_SECTION = "TestUser";
public static final String TESTUSER = "testuser";
public static final String TESTPASS = "testpass";
protected static final String DGI_URL = "https://localhost:21443/";
@@ -104,7 +105,7 @@ public class BaseSSLAndKerberosTest extends BaseSecurityTest {
kdc.createPrincipal(TESTUSER, TESTPASS);
StringBuilder jaas = new StringBuilder(1024);
- jaas.append("TestUser {\n" +
+ jaas.append(TEST_USER_JAAS_SECTION + " {\n" +
" com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" +
"};\n");
jaas.append(createJAASEntry("Client", "dgi", userKeytabFile));
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
index 270a20d..54c570c 100644
--- a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
@@ -140,10 +140,6 @@ public class BaseSecurityTest {
}
public static String writeConfiguration(final PropertiesConfiguration configuration) throws Exception {
- String persistDir = TestUtils.getTempDirectory();
- TestUtils.writeConfiguration(configuration, persistDir + File.separator +
- ApplicationProperties.APPLICATION_PROPERTIES);
-
String confLocation = System.getProperty("atlas.conf");
URL url;
if (confLocation == null) {
@@ -153,6 +149,10 @@ public class BaseSecurityTest {
}
PropertiesConfiguration configuredProperties = new PropertiesConfiguration();
configuredProperties.load(url);
+
+ configuredProperties.copy(configuration);
+
+ String persistDir = TestUtils.getTempDirectory();
TestUtils.writeConfiguration(configuredProperties, persistDir + File.separator +
ApplicationProperties.APPLICATION_PROPERTIES);
ApplicationProperties.forceReload();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java b/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
index 521c037..8afcc26 100755
--- a/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
@@ -19,7 +19,6 @@
package org.apache.atlas.web.security;
import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasException;
import org.apache.atlas.web.service.SecureEmbeddedServer;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
@@ -42,7 +41,7 @@ import static org.apache.atlas.security.SecurityProperties.SERVER_CERT_PASSWORD_
import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY;
public class SSLTest extends BaseSSLAndKerberosTest {
- private AtlasClient dgiCLient;
+ private AtlasClient atlasClient;
private Path jksPath;
private String providerUrl;
private TestSecureEmbeddedServer secureEmbeddedServer;
@@ -76,7 +75,7 @@ public class SSLTest extends BaseSSLAndKerberosTest {
final PropertiesConfiguration configuration = getSSLConfiguration(providerUrl);
String persistDir = writeConfiguration(configuration);
- dgiCLient = new AtlasClient(DGI_URL) {
+ atlasClient = new AtlasClient(DGI_URL) {
@Override
protected PropertiesConfiguration getClientProperties() {
return configuration;
@@ -139,6 +138,6 @@ public class SSLTest extends BaseSSLAndKerberosTest {
@Test
public void testService() throws Exception {
- dgiCLient.listTypes();
+ atlasClient.listTypes();
}
}
[2/2] incubator-atlas git commit: ATLAS-585 NotificationHookConsumer
creates new AtlasClient for every message (shwethags)
Posted by sh...@apache.org.
ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/1e3029bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/1e3029bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/1e3029bc
Branch: refs/heads/master
Commit: 1e3029bc7283e233dc816de7d83b28eddd4f4b36
Parents: 334429a
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Fri May 6 12:46:49 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Fri May 6 13:08:48 2016 +0530
----------------------------------------------------------------------
.gitignore | 6 +-
addons/falcon-bridge/pom.xml | 5 -
addons/hive-bridge/pom.xml | 5 -
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 8 +-
.../org/apache/atlas/hive/hook/HiveHookIT.java | 15 +-
addons/sqoop-bridge/pom.xml | 5 -
addons/storm-bridge/pom.xml | 5 -
.../java/org/apache/atlas/AtlasAdminClient.java | 2 +-
.../main/java/org/apache/atlas/AtlasClient.java | 150 ++++---
.../org/apache/atlas/AtlasServiceException.java | 21 +-
.../atlas/security/SecureClientUtils.java | 25 +-
.../java/org/apache/atlas/AtlasClientTest.java | 3 +-
distro/src/conf/atlas-log4j.xml | 2 +-
.../notification/NotificationHookConsumer.java | 265 ------------
.../NotificationHookConsumerTest.java | 183 ---------
pom.xml | 22 +-
release-log.txt | 1 +
.../graph/GraphBackedSearchIndexer.java | 6 +
webapp/pom.xml | 1 +
.../java/org/apache/atlas/LocalAtlasClient.java | 260 ++++++++++++
.../org/apache/atlas/LocalServletRequest.java | 400 +++++++++++++++++++
.../org/apache/atlas/examples/QuickStart.java | 9 +-
.../notification/NotificationHookConsumer.java | 259 ++++++++++++
.../web/filters/AtlasAuthenticationFilter.java | 53 ++-
.../apache/atlas/web/filters/AuditFilter.java | 2 +-
.../atlas/web/listeners/GuiceServletConfig.java | 3 +-
.../atlas/web/resources/AdminResource.java | 3 +-
.../atlas/web/resources/EntityResource.java | 4 +-
.../org/apache/atlas/web/util/Servlets.java | 32 ++
.../org/apache/atlas/LocalAtlasClientTest.java | 148 +++++++
.../NotificationHookConsumerIT.java | 34 +-
.../NotificationHookConsumerTest.java | 169 ++++++++
.../AtlasAuthenticationKerberosFilterIT.java | 190 ---------
.../AtlasAuthenticationKerberosFilterTest.java | 187 +++++++++
.../AtlasAuthenticationSimpleFilterIT.java | 98 -----
.../AtlasAuthenticationSimpleFilterTest.java | 89 +++++
.../atlas/web/resources/BaseResourceIT.java | 15 +-
.../web/resources/EntityJerseyResourceIT.java | 27 +-
.../web/security/BaseSSLAndKerberosTest.java | 3 +-
.../atlas/web/security/BaseSecurityTest.java | 8 +-
.../org/apache/atlas/web/security/SSLTest.java | 7 +-
41 files changed, 1797 insertions(+), 933 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index b6fb8d8..b5a1c74 100755
--- a/.gitignore
+++ b/.gitignore
@@ -46,5 +46,9 @@ test-output
.DS_Store
*.swp
+#atlas data directory creates when tests are run from IDE
+**/atlas.data/**
+**/${sys:atlas.data}/**
+
#hbase package downloaded
-distro/hbase/*.tar.gz
\ No newline at end of file
+distro/hbase/*.tar.gz
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/falcon-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index 9b07c9f..14c6090 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -152,11 +152,6 @@
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
- <artifactId>atlas-server-api</artifactId>
- <version>${project.version}</version>
- </artifactItem>
- <artifactItem>
- <groupId>${project.groupId}</groupId>
<artifactId>hdfs-model</artifactId>
<version>${project.version}</version>
</artifactItem>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index e125f18..eeb2aa4 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -239,11 +239,6 @@
<version>${project.version}</version>
</artifactItem>
<artifactItem>
- <groupId>${project.groupId}</groupId>
- <artifactId>atlas-server-api</artifactId>
- <version>${project.version}</version>
- </artifactItem>
- <artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 104c0c5..d4212a1 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -181,10 +180,10 @@ public class HiveMetaStoreBridge {
String entityJSON = InstanceSerialization.toJson(referenceable, true);
LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON);
- JSONArray guids = getAtlasClient().createEntity(entityJSON);
+ List<String> guids = getAtlasClient().createEntity(entityJSON);
LOG.debug("created instance for type " + typeName + ", guid: " + guids);
- return new Referenceable(guids.getString(0), referenceable.getTypeName(), null);
+ return new Referenceable(guids.get(0), referenceable.getTypeName(), null);
}
/**
@@ -536,8 +535,7 @@ public class HiveMetaStoreBridge {
public static void main(String[] argv) throws Exception {
Configuration atlasConf = ApplicationProperties.get();
String atlasEndpoint = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- AtlasClient atlasClient = new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
+ AtlasClient atlasClient = new AtlasClient(atlasEndpoint);
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index b0d4c5c..317d636 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -20,7 +20,6 @@ package org.apache.atlas.hive.hook;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
@@ -31,12 +30,7 @@ import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.atlas.typesystem.json.TypesSerialization$;
import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
@@ -51,7 +45,6 @@ import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
@@ -59,9 +52,6 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
import java.io.File;
import java.text.ParseException;
import java.util.Date;
@@ -737,8 +727,6 @@ public class HiveHookIT {
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
- assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
- HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
@@ -749,6 +737,9 @@ public class HiveHookIT {
}
});
+ assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+ HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
+
//Change name and add comment
oldColName = "name2";
newColName = "name3";
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/sqoop-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index 4b5dbb1..343bb4e 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -234,11 +234,6 @@
<version>${project.version}</version>
</artifactItem>
<artifactItem>
- <groupId>${project.groupId}</groupId>
- <artifactId>atlas-server-api</artifactId>
- <version>${project.version}</version>
- </artifactItem>
- <artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/addons/storm-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index d8d98f5..45ec846 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -190,11 +190,6 @@
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
- <artifactId>atlas-server-api</artifactId>
- <version>${project.version}</version>
- </artifactItem>
- <artifactItem>
- <groupId>${project.groupId}</groupId>
<artifactId>hdfs-model</artifactId>
<version>${project.version}</version>
</artifactItem>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java b/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
index 473f72a..d2ae7f0 100644
--- a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
@@ -60,7 +60,7 @@ public class AtlasAdminClient {
Configuration configuration = ApplicationProperties.get();
String atlasServerUri = configuration.getString(
AtlasConstants.ATLAS_REST_ADDRESS_KEY, AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS);
- AtlasClient atlasClient = new AtlasClient(atlasServerUri, null, null);
+ AtlasClient atlasClient = new AtlasClient(atlasServerUri);
return handleCommand(commandLine, atlasServerUri, atlasClient);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index be34802..234af5b 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -32,7 +32,6 @@ import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
-import org.apache.atlas.typesystem.json.TypesSerialization$;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
@@ -78,6 +77,7 @@ public class AtlasClient {
public static final String COUNT = "count";
public static final String ROWS = "rows";
public static final String DATATYPE = "dataType";
+ public static final String STATUS = "Status";
public static final String EVENTS = "events";
public static final String START_KEY = "startKey";
@@ -115,6 +115,9 @@ public class AtlasClient {
// Setting the default value based on testing failovers while client code like quickstart is running.
public static final int DEFAULT_NUM_RETRIES = 4;
public static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+
+ public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
+
// Setting the default value based on testing failovers while client code like quickstart is running.
// With number of retries, this gives a total time of about 20s for the server to start.
public static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
@@ -124,28 +127,20 @@ public class AtlasClient {
private Configuration configuration;
/**
- * Create a new AtlasClient.
- *
- * @param baseUrl The URL of the Atlas server to connect to.
- */
- public AtlasClient(String baseUrl) {
- this(baseUrl, null, null);
- }
-
- /**
- * Create a new Atlas Client.
- * @param baseUrl The URL of the Atlas server to connect to.
- * @param ugi The {@link UserGroupInformation} of logged in user.
- * @param doAsUser The user on whose behalf queries will be executed.
+ * Create a new Atlas client.
+ * @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
+ * High Availability mode. The client will automatically determine the
+ * active instance on startup and also when there is a scenario of
+ * failover.
*/
- public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) {
- initializeState(new String[] {baseUrl}, ugi, doAsUser);
+ public AtlasClient(String... baseUrls) throws AtlasException {
+ this(getCurrentUGI(), baseUrls);
}
/**
* Create a new Atlas client.
- * @param ugi The {@link UserGroupInformation} of logged in user, can be null in unsecure mode.
- * @param doAsUser The user on whose behalf queries will be executed, can be null in unsecure mode.
+ * @param ugi UserGroupInformation
+ * @param doAsUser
* @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
* High Availability mode. The client will automatically determine the
* active instance on startup and also when there is a scenario of
@@ -155,6 +150,23 @@ public class AtlasClient {
initializeState(baseUrls, ugi, doAsUser);
}
+ private static UserGroupInformation getCurrentUGI() throws AtlasException {
+ try {
+ return UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new AtlasException(e);
+ }
+ }
+
+ private AtlasClient(UserGroupInformation ugi, String[] baseUrls) {
+ this(ugi, ugi.getShortUserName(), baseUrls);
+ }
+
+ //Used by LocalAtlasClient
+ protected AtlasClient() {
+ //Do nothing
+ }
+
private void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
configuration = getClientProperties();
Client client = getClient(configuration, ugi, doAsUser);
@@ -340,7 +352,7 @@ public class AtlasClient {
WebResource resource = getResource(service, API.STATUS);
JSONObject response = callAPIWithResource(API.STATUS, resource, null);
try {
- result = response.getString("Status");
+ result = response.getString(STATUS);
} catch (JSONException e) {
LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e);
}
@@ -418,12 +430,14 @@ public class AtlasClient {
public List<String> createType(String typeAsJson) throws AtlasServiceException {
LOG.debug("Creating type definition: {}", typeAsJson);
JSONObject response = callAPI(API.CREATE_TYPE, typeAsJson);
- return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
+ List<String> results = extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
@Override
String extractElement(JSONObject element) throws JSONException {
return element.getString(AtlasClient.NAME);
}
});
+ LOG.debug("Create type definition returned results: {}", results);
+ return results;
}
/**
@@ -470,14 +484,16 @@ public class AtlasClient {
* @throws AtlasServiceException
*/
public List<String> updateType(String typeAsJson) throws AtlasServiceException {
- LOG.debug("Updating tyep definition: {}", typeAsJson);
+ LOG.debug("Updating type definition: {}", typeAsJson);
JSONObject response = callAPI(API.UPDATE_TYPE, typeAsJson);
- return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
+ List<String> results = extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
@Override
String extractElement(JSONObject element) throws JSONException {
return element.getString(AtlasClient.NAME);
}
});
+ LOG.debug("Update type definition returned results: {}", results);
+ return results;
}
/**
@@ -495,10 +511,11 @@ public class AtlasClient {
return extractResults(jsonObject, AtlasClient.RESULTS, new ExtractOperation<String, String>());
}
- public String getType(String typeName) throws AtlasServiceException {
+ public TypesDef getType(String typeName) throws AtlasServiceException {
try {
JSONObject response = callAPI(API.GET_TYPE, null, typeName);;
- return response.getString(DEFINITION);
+ String typeJson = response.getString(DEFINITION);
+ return TypesSerialization.fromJson(typeJson);
} catch (AtlasServiceException e) {
if (Response.Status.NOT_FOUND.equals(e.getStatus())) {
return null;
@@ -515,14 +532,12 @@ public class AtlasClient {
* @return json array of guids
* @throws AtlasServiceException
*/
- public JSONArray createEntity(JSONArray entities) throws AtlasServiceException {
+ protected List<String> createEntity(JSONArray entities) throws AtlasServiceException {
LOG.debug("Creating entities: {}", entities);
JSONObject response = callAPI(API.CREATE_ENTITY, entities.toString());
- try {
- return response.getJSONArray(GUID);
- } catch (JSONException e) {
- throw new AtlasServiceException(API.GET_ENTITY, e);
- }
+ List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+ LOG.debug("Create entities returned results: {}", results);
+ return results;
}
/**
@@ -531,15 +546,15 @@ public class AtlasClient {
* @return json array of guids
* @throws AtlasServiceException
*/
- public JSONArray createEntity(String... entitiesAsJson) throws AtlasServiceException {
+ public List<String> createEntity(String... entitiesAsJson) throws AtlasServiceException {
return createEntity(new JSONArray(Arrays.asList(entitiesAsJson)));
}
- public JSONArray createEntity(Referenceable... entities) throws AtlasServiceException {
+ public List<String> createEntity(Referenceable... entities) throws AtlasServiceException {
return createEntity(Arrays.asList(entities));
}
- public JSONArray createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
+ public List<String> createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
JSONArray entityArray = getEntitiesArray(entities);
return createEntity(entityArray);
}
@@ -559,19 +574,21 @@ public class AtlasClient {
* @return json array of guids which were updated/created
* @throws AtlasServiceException
*/
- public JSONArray updateEntities(Referenceable... entities) throws AtlasServiceException {
+ public List<String> updateEntities(Referenceable... entities) throws AtlasServiceException {
return updateEntities(Arrays.asList(entities));
}
- public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
+ protected List<String> updateEntities(JSONArray entities) throws AtlasServiceException {
+ LOG.debug("Updating entities: {}", entities);
+ JSONObject response = callAPI(API.UPDATE_ENTITY, entities.toString());
+ List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+ LOG.debug("Update entities returned results: {}", results);
+ return results;
+ }
+
+ public List<String> updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
JSONArray entitiesArray = getEntitiesArray(entities);
- LOG.debug("Updating entities: {}", entitiesArray);
- JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString());
- try {
- return response.getJSONArray(GUID);
- } catch (JSONException e) {
- throw new AtlasServiceException(API.UPDATE_ENTITY, e);
- }
+ return updateEntities(entitiesArray);
}
/**
@@ -651,6 +668,8 @@ public class AtlasClient {
Referenceable entity) throws AtlasServiceException {
final API api = API.UPDATE_ENTITY_PARTIAL;
String entityJson = InstanceSerialization.toJson(entity, true);
+ LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
+ uniqueAttributeName, uniqueAttributeValue, entityJson);
JSONObject response = callAPIWithRetries(api, entityJson, new ResourceCreator() {
@Override
public WebResource createResource() {
@@ -661,10 +680,16 @@ public class AtlasClient {
return resource;
}
});
+ String result = getString(response, GUID);
+ LOG.debug("Update entity returned result: {}", result);
+ return result;
+ }
+
+ protected String getString(JSONObject jsonObject, String parameter) throws AtlasServiceException {
try {
- return response.getString(GUID);
+ return jsonObject.getString(parameter);
} catch (JSONException e) {
- throw new AtlasServiceException(api, e);
+ throw new AtlasServiceException(e);
}
}
@@ -676,6 +701,7 @@ public class AtlasClient {
* @throws AtlasServiceException
*/
public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
+ LOG.debug("Deleting entities: {}", guids);
JSONObject jsonResponse = callAPIWithRetries(API.DELETE_ENTITIES, null, new ResourceCreator() {
@Override
public WebResource createResource() {
@@ -687,7 +713,9 @@ public class AtlasClient {
return resource;
}
});
- return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+ List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+ LOG.debug("Delete entities returned results: {}", results);
+ return results;
}
/**
@@ -699,13 +727,17 @@ public class AtlasClient {
*/
public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue)
throws AtlasServiceException {
+ LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
+ uniqueAttributeValue);
API api = API.DELETE_ENTITY;
WebResource resource = getResource(api);
resource = resource.queryParam(TYPE, entityType);
resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource, null);
- return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+ List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
+ LOG.debug("Delete entities returned results: {}", results);
+ return results;
}
/**
@@ -789,13 +821,13 @@ public class AtlasClient {
return extractResults(jsonResponse, AtlasClient.RESULTS, new ExtractOperation<String, String>());
}
- private class ExtractOperation<T, U> {
+ protected class ExtractOperation<T, U> {
T extractElement(U element) throws JSONException {
return (T) element;
}
}
- private <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce)
+ protected <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce)
throws AtlasServiceException {
try {
JSONArray results = jsonResponse.getJSONArray(key);
@@ -1011,22 +1043,12 @@ public class AtlasClient {
private class AtlasClientContext {
private String[] baseUrls;
private Client client;
- private final UserGroupInformation ugi;
- private final String doAsUser;
+ private String doAsUser;
+ private UserGroupInformation ugi;
public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
this.baseUrls = baseUrls;
this.client = client;
- this.ugi = ugi;
- this.doAsUser = doAsUser;
- }
-
- public UserGroupInformation getUgi() {
- return ugi;
- }
-
- public String getDoAsUser() {
- return doAsUser;
}
public Client getClient() {
@@ -1036,6 +1058,14 @@ public class AtlasClient {
public String[] getBaseUrls() {
return baseUrls;
}
+
+ public String getDoAsUser() {
+ return doAsUser;
+ }
+
+ public UserGroupInformation getUgi() {
+ return ugi;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/AtlasServiceException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasServiceException.java b/client/src/main/java/org/apache/atlas/AtlasServiceException.java
index 6f68a71..2117a6b 100755
--- a/client/src/main/java/org/apache/atlas/AtlasServiceException.java
+++ b/client/src/main/java/org/apache/atlas/AtlasServiceException.java
@@ -19,6 +19,10 @@
package org.apache.atlas;
import com.sun.jersey.api.client.ClientResponse;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import javax.ws.rs.WebApplicationException;
public class AtlasServiceException extends Exception {
private ClientResponse.Status status;
@@ -27,12 +31,19 @@ public class AtlasServiceException extends Exception {
super("Metadata service API " + api + " failed", e);
}
+ public AtlasServiceException(AtlasClient.API api, WebApplicationException e) throws JSONException {
+ this(api, ClientResponse.Status.fromStatusCode(e.getResponse().getStatus()),
+ ((JSONObject) e.getResponse().getEntity()).getString("stackTrace"));
+ }
+
+ private AtlasServiceException(AtlasClient.API api, ClientResponse.Status status, String response) {
+ super("Metadata service API " + api + " failed with status " + status.getStatusCode() + "(" +
+ status.getReasonPhrase() + ") Response Body (" + response + ")");
+ this.status = status;
+ }
+
public AtlasServiceException(AtlasClient.API api, ClientResponse response) {
- super("Metadata service API " + api + " failed with status " +
- response.getClientResponseStatus().getStatusCode() + "(" +
- response.getClientResponseStatus().getReasonPhrase() + ") Response Body (" +
- response.getEntity(String.class) + ")");
- this.status = response.getClientResponseStatus();
+ this(api, ClientResponse.Status.fromStatusCode(response.getStatus()), response.getEntity(String.class));
}
public AtlasServiceException(Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java b/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
index d3b474a..1686112 100644
--- a/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
+++ b/client/src/main/java/org/apache/atlas/security/SecureClientUtils.java
@@ -20,6 +20,7 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import org.apache.atlas.AtlasException;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
@@ -60,7 +61,7 @@ public class SecureClientUtils {
public static URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config,
- org.apache.commons.configuration.Configuration clientConfig, final String doAsUser,
+ org.apache.commons.configuration.Configuration clientConfig, String doAsUser,
final UserGroupInformation ugi) {
config.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
Configuration conf = new Configuration();
@@ -80,17 +81,16 @@ public class SecureClientUtils {
final DelegationTokenAuthenticatedURL.Token token = new DelegationTokenAuthenticatedURL.Token();
HttpURLConnectionFactory httpURLConnectionFactory = null;
try {
- UserGroupInformation ugiToUse = ugi != null ?
- ugi : UserGroupInformation.getCurrentUser();
+ UserGroupInformation ugiToUse = ugi != null ? ugi : UserGroupInformation.getCurrentUser();
final UserGroupInformation actualUgi =
- (ugiToUse.getAuthenticationMethod() ==
- UserGroupInformation.AuthenticationMethod.PROXY)
- ? ugiToUse.getRealUser()
- : ugiToUse;
- LOG.info("Real User: {}, is from ticket cache? {}",
- actualUgi,
- actualUgi.isLoginTicketBased());
+ (ugiToUse.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY)
+ ? ugiToUse.getRealUser() : ugiToUse;
+ LOG.info("Real User: {}, is from ticket cache? {}", actualUgi, actualUgi.isLoginTicketBased());
+ if (StringUtils.isEmpty(doAsUser)) {
+ doAsUser = actualUgi.getShortUserName();
+ }
LOG.info("doAsUser: {}", doAsUser);
+ final String finalDoAsUser = doAsUser;
httpURLConnectionFactory = new HttpURLConnectionFactory() {
@Override
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
@@ -99,9 +99,8 @@ public class SecureClientUtils {
@Override
public HttpURLConnection run() throws Exception {
try {
- return new DelegationTokenAuthenticatedURL(
- finalAuthenticator, connConfigurator)
- .openConnection(url, token, doAsUser);
+ return new DelegationTokenAuthenticatedURL(finalAuthenticator, connConfigurator)
+ .openConnection(url, token, finalDoAsUser);
} catch (Exception e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/client/src/test/java/org/apache/atlas/AtlasClientTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
index 8911bf5..0e80573 100644
--- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java
+++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
@@ -30,14 +30,12 @@ import org.testng.annotations.Test;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
-
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -326,6 +324,7 @@ public class AtlasClientTest {
thenReturn(response);
when(resourceCreator.createResource()).thenReturn(resourceObject);
+ when(configuration.getString("atlas.http.authentication.type", "simple")).thenReturn("simple");
AtlasClient atlasClient = getClientForTest("http://localhost:31000");
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index 1ac4082..17bf68f 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -61,7 +61,7 @@
</logger>
<root>
- <priority value="info"/>
+ <priority value="warn"/>
<appender-ref ref="FILE"/>
</root>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
deleted file mode 100644
index 1f2df3e..0000000
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import kafka.consumer.ConsumerTimeoutException;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.ha.HAConfiguration;
-import org.apache.atlas.listener.ActiveStateChangeHandler;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.service.Service;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Consumer of notifications from hooks e.g., hive hook etc.
- */
-@Singleton
-public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
- private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
-
- public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
- public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
- public static final int SERVER_READY_WAIT_TIME_MS = 1000;
-
- private NotificationInterface notificationInterface;
- private ExecutorService executors;
- private String atlasEndpoint;
- private Configuration applicationProperties;
- private List<HookConsumer> consumers;
-
- @Inject
- public NotificationHookConsumer(NotificationInterface notificationInterface) {
- this.notificationInterface = notificationInterface;
- }
-
- @Override
- public void start() throws AtlasException {
- Configuration configuration = ApplicationProperties.get();
- startInternal(configuration, null);
- }
-
- void startInternal(Configuration configuration,
- ExecutorService executorService) {
- this.applicationProperties = configuration;
- this.atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
- if (consumers == null) {
- consumers = new ArrayList<>();
- }
- if (executorService != null) {
- executors = executorService;
- }
- if (!HAConfiguration.isHAEnabled(configuration)) {
- LOG.info("HA is disabled, starting consumers inline.");
- startConsumers(executorService);
- }
- }
-
- private void startConsumers(ExecutorService executorService) {
- int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
- List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
- notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
- if (executorService == null) {
- executorService = Executors.newFixedThreadPool(notificationConsumers.size());
- }
- executors = executorService;
- for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
- HookConsumer hookConsumer = new HookConsumer(consumer);
- consumers.add(hookConsumer);
- executors.submit(hookConsumer);
- }
- }
-
- @Override
- public void stop() {
- //Allow for completion of outstanding work
- notificationInterface.close();
- try {
- if (executors != null) {
- stopConsumerThreads();
- executors.shutdownNow();
- if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
- LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
- }
- executors = null;
- }
- } catch (InterruptedException e) {
- LOG.error("Failure in shutting down consumers");
- }
- }
-
- private void stopConsumerThreads() {
- if (consumers != null) {
- for (HookConsumer consumer : consumers) {
- consumer.stop();
- }
- consumers.clear();
- }
- }
-
- /**
- * Start Kafka consumer threads that read from Kafka topic when server is activated.
- *
- * Since the consumers create / update entities to the shared backend store, only the active instance
- * should perform this activity. Hence, these threads are started only on server activation.
- */
- @Override
- public void instanceIsActive() {
- LOG.info("Reacting to active state: initializing Kafka consumers");
- startConsumers(executors);
- }
-
- /**
- * Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
- *
- * Since the consumers create / update entities to the shared backend store, only the active instance
- * should perform this activity. Hence, these threads are stopped only on server deactivation.
- */
- @Override
- public void instanceIsPassive() {
- LOG.info("Reacting to passive state: shutting down Kafka consumers.");
- stop();
- }
-
- static class Timer {
- public void sleep(int interval) throws InterruptedException {
- Thread.sleep(interval);
- }
- }
-
- class HookConsumer implements Runnable {
- private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
- private final AtomicBoolean shouldRun = new AtomicBoolean(false);
-
- public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
- this.consumer = consumer;
- }
-
- private boolean hasNext() {
- try {
- return consumer.hasNext();
- } catch (ConsumerTimeoutException e) {
- return false;
- }
- }
-
- @Override
- public void run() {
- shouldRun.set(true);
-
- if (!serverAvailable(new NotificationHookConsumer.Timer())) {
- return;
- }
-
- while (shouldRun.get()) {
- try {
- if (hasNext()) {
- HookNotification.HookNotificationMessage message = consumer.next();
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(message.getUser());
- AtlasClient atlasClient = getAtlasClient(ugi);
-
- try {
- switch (message.getType()) {
- case ENTITY_CREATE:
- HookNotification.EntityCreateRequest createRequest =
- (HookNotification.EntityCreateRequest) message;
- atlasClient.createEntity(createRequest.getEntities());
- break;
-
- case ENTITY_PARTIAL_UPDATE:
- HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
- (HookNotification.EntityPartialUpdateRequest) message;
- atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
- partialUpdateRequest.getAttribute(),
- partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
- break;
-
- case ENTITY_DELETE:
- HookNotification.EntityDeleteRequest deleteRequest =
- (HookNotification.EntityDeleteRequest) message;
- atlasClient.deleteEntity(deleteRequest.getTypeName(),
- deleteRequest.getAttribute(),
- deleteRequest.getAttributeValue());
- break;
-
- case ENTITY_FULL_UPDATE:
- HookNotification.EntityUpdateRequest updateRequest =
- (HookNotification.EntityUpdateRequest) message;
- atlasClient.updateEntities(updateRequest.getEntities());
- break;
-
- default:
- throw new IllegalStateException("Unhandled exception!");
- }
- } catch (Exception e) {
- //todo handle failures
- LOG.warn("Error handling message {}", message, e);
- }
- }
- } catch (Throwable t) {
- LOG.warn("Failure in NotificationHookConsumer", t);
- }
- }
- }
-
- protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
- return new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
- }
-
- boolean serverAvailable(Timer timer) {
- try {
- AtlasClient atlasClient = getAtlasClient(UserGroupInformation.getCurrentUser());
- while (!atlasClient.isServerReady()) {
- try {
- LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
- SERVER_READY_WAIT_TIME_MS);
- timer.sleep(SERVER_READY_WAIT_TIME_MS);
- } catch (InterruptedException e) {
- LOG.info("Interrupted while waiting for Atlas Server to become ready, "
- + "exiting consumer thread.", e);
- return false;
- }
- }
- } catch (Throwable e) {
- LOG.info(
- "Handled AtlasServiceException while waiting for Atlas Server to become ready, "
- + "exiting consumer thread.", e);
- return false;
- }
- LOG.info("Atlas Server is ready, can start reading Kafka events.");
- return true;
- }
-
- public void stop() {
- shouldRun.set(false);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
deleted file mode 100644
index 177de6d..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification;
-
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.atlas.ha.HAConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import static org.mockito.Mockito.*;
-import static org.testng.AssertJUnit.assertFalse;
-import static org.testng.AssertJUnit.assertTrue;
-
-public class NotificationHookConsumerTest {
-
- @Mock
- private NotificationInterface notificationInterface;
-
- @Mock
- private AtlasClient atlasClient;
-
- @Mock
- private Configuration configuration;
-
- @Mock
- private ExecutorService executorService;
-
- @BeforeMethod
- public void setup() {
- MockitoAnnotations.initMocks(this);
- }
-
- @Test
- public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
- @Override
- protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
- return atlasClient;
- }
- };
- NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
- when(atlasClient.isServerReady()).thenReturn(true);
-
- assertTrue(hookConsumer.serverAvailable(timer));
-
- verifyZeroInteractions(timer);
- }
-
- @Test
- public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
- @Override
- protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
- return atlasClient;
- }
- };
- NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
- when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
-
- assertTrue(hookConsumer.serverAvailable(timer));
-
- verify(timer, times(3)).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
- }
-
- @Test
- public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
- @Override
- protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
- return atlasClient;
- }
- };
- NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
- doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
- when(atlasClient.isServerReady()).thenReturn(false);
-
- assertFalse(hookConsumer.serverAvailable(timer));
- }
-
- @Test
- public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
- @Override
- protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
- return atlasClient;
- }
- };
- NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
- when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
- new Exception()));
-
- assertFalse(hookConsumer.serverAvailable(timer));
- }
-
- @Test
- public void testConsumersStartedIfHAIsDisabled() {
- when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
- when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- consumers.add(mock(NotificationConsumer.class));
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
- thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
- notificationHookConsumer.startInternal(configuration, executorService);
- verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
- verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
- }
-
- @Test
- public void testConsumersAreNotStartedIfHAIsEnabled() {
- when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
- when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- consumers.add(mock(NotificationConsumer.class));
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
- thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
- notificationHookConsumer.startInternal(configuration, executorService);
- verifyZeroInteractions(notificationInterface);
- }
-
- @Test
- public void testConsumersAreStartedWhenInstanceBecomesActive() {
- when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
- when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- consumers.add(mock(NotificationConsumer.class));
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
- thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
- notificationHookConsumer.startInternal(configuration, executorService);
- notificationHookConsumer.instanceIsActive();
- verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
- verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
- }
-
- @Test
- public void testConsumersAreStoppedWhenInstanceBecomesPassive() {
- when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
- when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
- List<NotificationConsumer<Object>> consumers = new ArrayList();
- consumers.add(mock(NotificationConsumer.class));
- when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
- thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
- notificationHookConsumer.startInternal(configuration, executorService);
- notificationHookConsumer.instanceIsPassive();
- verify(notificationInterface).close();
- verify(executorService).shutdownNow();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 27d44cf..7b872c3 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1371,7 +1371,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
- <version>2.4</version>
+ <version>2.6</version>
</plugin>
<plugin>
@@ -1657,15 +1657,17 @@
<exclude>**/overlays/**</exclude>
<exclude>dev-support/**</exclude>
<exclude>**/users-credentials.properties</exclude>
- <exclude>**/public/css/animate.min.css</exclude>
- <exclude>**/public/css/fonts/**</exclude>
- <exclude>**/public/css/font-awesome.min.css</exclude>
- <exclude>**/public/js/require-handlebars-plugin/**</exclude>
- <exclude>**/node_modules/**</exclude>
- <!-- All the npm plugins are copied here, so exclude it -->
- <exclude>**/public/js/libs/**</exclude>
-
-
+ <exclude>**/public/css/animate.min.css</exclude>
+ <exclude>**/public/css/fonts/**</exclude>
+ <exclude>**/public/css/font-awesome.min.css</exclude>
+ <exclude>**/public/js/require-handlebars-plugin/**</exclude>
+ <exclude>**/node_modules/**</exclude>
+ <!-- All the npm plugins are copied here, so exclude it -->
+ <exclude>**/public/js/libs/**</exclude>
+
+ <!-- atlas data directory creates when tests are run from IDE -->
+ <exclude>**/atlas.data/**</exclude>
+ <exclude>**/${sys:atlas.data}/**</exclude>
</excludes>
</configuration>
<executions>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a2de92a..7c17a9d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -18,6 +18,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
+ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)
ATLAS-682 Set HBase root dir to be relative to test target directory for HBaseBasedAuditRepositoryTest (shwethags via yhemanth)
ATLAS-742 Avoid downloading hbase multiple times (shwethags via yhemanth)
ATLAS-659 atlas_start fails on Windows (dkantor via shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index d83c08c..9b9fe35 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -106,6 +106,12 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
// create a composite index for entity state
createCompositeAndMixedIndex(management, Constants.STATE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
+ // create a composite index for entity state
+ createCompositeAndMixedIndex(management, Constants.TIMESTAMP_PROPERTY_KEY, Long.class, false, Cardinality.SINGLE, true);
+
+ // create a composite index for entity state
+ createCompositeAndMixedIndex(management, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class, false, Cardinality.SINGLE, true);
+
// create a composite and mixed index for type since it can be combined with other keys
createCompositeAndMixedIndex(management, Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE,
true);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 393863c..de48c15 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -301,6 +301,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<configuration>
+ <archiveClasses>true</archiveClasses>
<attachClasses>true</attachClasses>
<overlays>
<!-- <overlay>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java b/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java
new file mode 100644
index 0000000..c6ed85d
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.google.inject.Inject;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.atlas.web.filters.AuditFilter;
+import org.apache.atlas.web.resources.EntityResource;
+import org.apache.atlas.web.service.ServiceState;
+import org.apache.atlas.web.util.DateTimeHelper;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Local atlas client which calls the resource methods directly. Used by NotificationHookConsumer.
+ */
+public class LocalAtlasClient extends AtlasClient {
+ private static final String LOCALHOST = "localhost";
+ private static final String CLASS = LocalAtlasClient.class.getSimpleName();
+
+ public static final Logger LOG = LoggerFactory.getLogger(LocalAtlasClient.class);
+
+ private final EntityResource entityResource;
+
+ private final ServiceState serviceState;
+
+ @Inject
+ public LocalAtlasClient(ServiceState serviceState, EntityResource entityResource) {
+ super();
+ this.serviceState = serviceState;
+ this.entityResource = entityResource;
+ }
+
+ private String user;
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ private void setRequestContext() {
+ RequestContext requestContext = RequestContext.createContext();
+ requestContext.setUser(user);
+ }
+
+ @Override
+ public boolean isServerReady() throws AtlasServiceException {
+ return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE;
+ }
+
+ @Override
+ protected List<String> createEntity(final JSONArray entities) throws AtlasServiceException {
+ LOG.debug("Creating entities: {}", entities);
+ EntityOperation entityOperation = new EntityOperation(API.CREATE_ENTITY) {
+ @Override
+ Response invoke() {
+ return entityResource.submit(new LocalServletRequest(entities.toString()));
+ }
+ };
+ JSONObject response = entityOperation.run();
+ List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+ LOG.debug("Create entities returned results: {}", results);
+ return results;
+ }
+
+ @Override
+ protected List<String> updateEntities(final JSONArray entities) throws AtlasServiceException {
+ LOG.debug("Updating entities: {}", entities);
+ EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY) {
+ @Override
+ Response invoke() {
+ return entityResource.updateEntities(new LocalServletRequest(entities.toString()));
+ }
+ };
+ JSONObject response = entityOperation.run();
+ List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+ LOG.debug("Update entities returned results: {}", results);
+ return results;
+ }
+
+ private abstract class EntityOperation {
+ private final API api;
+
+ public EntityOperation(API api) {
+ this.api = api;
+ }
+
+ public JSONObject run() throws AtlasServiceException {
+ setRequestContext();
+ AuditFilter.audit(user, CLASS, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
+
+ try {
+ Response response = invoke();
+ return (JSONObject) response.getEntity();
+ } catch(WebApplicationException e) {
+ try {
+ throw new AtlasServiceException(api, e);
+ } catch (JSONException e1) {
+ throw new AtlasServiceException(e);
+ }
+ }
+ }
+
+ abstract Response invoke();
+ }
+
+ @Override
+ public String updateEntity(final String entityType, final String uniqueAttributeName,
+ final String uniqueAttributeValue, Referenceable entity) throws AtlasServiceException {
+ final String entityJson = InstanceSerialization.toJson(entity, true);
+ LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
+ uniqueAttributeName, uniqueAttributeValue, entityJson);
+ EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY_PARTIAL) {
+ @Override
+ Response invoke() {
+ return entityResource.updateByUniqueAttribute(entityType, uniqueAttributeName, uniqueAttributeValue,
+ new LocalServletRequest(entityJson));
+ }
+ };
+ JSONObject response = entityOperation.run();
+ String result = getString(response, GUID);
+ LOG.debug("Update entity returned result: {}", result);
+ return result;
+ }
+
+ @Override
+ public List<String> deleteEntity(final String entityType, final String uniqueAttributeName,
+ final String uniqueAttributeValue) throws AtlasServiceException {
+ LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
+ uniqueAttributeValue);
+ EntityOperation entityOperation = new EntityOperation(API.DELETE_ENTITY) {
+ @Override
+ Response invoke() {
+ return entityResource.deleteEntities(null, entityType, uniqueAttributeName, uniqueAttributeValue);
+ }
+ };
+ JSONObject response = entityOperation.run();
+ List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
+ LOG.debug("Delete entities returned results: {}", results);
+ return results;
+ }
+
+ @Override
+ public String getAdminStatus() throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public List<String> createType(String typeAsJson) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public List<String> updateType(String typeAsJson) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public List<String> listTypes() throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public TypesDef getType(String typeName) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public void updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+
+ @Override
+ public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public Referenceable getEntity(String guid) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public Referenceable getEntity(final String entityType, final String attribute, final String value)
+ throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public List<String> listEntities(final String entityType) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public List<EntityAuditEvent> getEntityAuditEvents(String entityId, String startKey, short numResults)
+ throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public JSONArray search(final String searchQuery) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public JSONArray searchByDSL(final String query) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public JSONObject searchByFullText(final String query) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+
+ @Override
+ public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException {
+ throw new IllegalStateException("Not supported in LocalAtlasClient");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1e3029bc/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java b/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java
new file mode 100644
index 0000000..36a01b2
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/LocalServletRequest.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.DispatcherType;
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpUpgradeHandler;
+import javax.servlet.http.Part;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+
+public class LocalServletRequest implements HttpServletRequest {
+ private final String payload;
+
+ LocalServletRequest(String payload) {
+ this.payload = payload;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ @Override
+ public String getAuthType() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Cookie[] getCookies() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public long getDateHeader(String name) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getHeader(String name) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Enumeration<String> getHeaders(String name) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Enumeration<String> getHeaderNames() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public int getIntHeader(String name) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getMethod() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getPathInfo() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getPathTranslated() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getContextPath() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getQueryString() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getRemoteUser() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public boolean isUserInRole(String role) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Principal getUserPrincipal() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getRequestedSessionId() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getRequestURI() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public StringBuffer getRequestURL() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getServletPath() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public HttpSession getSession(boolean create) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public HttpSession getSession() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String changeSessionId() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public boolean isRequestedSessionIdValid() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromCookie() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromURL() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromUrl() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public boolean authenticate(HttpServletResponse response) throws IOException, ServletException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public void login(String username, String password) throws ServletException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public void logout() throws ServletException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Collection<Part> getParts() throws IOException, ServletException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Part getPart(String name) throws IOException, ServletException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public <T extends HttpUpgradeHandler> T upgrade(Class<T> handlerClass) throws IOException, ServletException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Object getAttribute(String name) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Enumeration<String> getAttributeNames() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getCharacterEncoding() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public int getContentLength() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public long getContentLengthLong() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getContentType() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public ServletInputStream getInputStream() throws IOException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getParameter(String name) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Enumeration<String> getParameterNames() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String[] getParameterValues(String name) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Map<String, String[]> getParameterMap() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getProtocol() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getScheme() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getServerName() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public int getServerPort() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public BufferedReader getReader() throws IOException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getRemoteAddr() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getRemoteHost() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public void setAttribute(String name, Object o) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public void removeAttribute(String name) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Locale getLocale() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public Enumeration<Locale> getLocales() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public boolean isSecure() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public RequestDispatcher getRequestDispatcher(String path) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getRealPath(String path) {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public int getRemotePort() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getLocalName() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public String getLocalAddr() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public int getLocalPort() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public ServletContext getServletContext() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public AsyncContext startAsync() throws IllegalStateException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse)
+ throws IllegalStateException {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public boolean isAsyncStarted() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public boolean isAsyncSupported() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public AsyncContext getAsyncContext() {
+ throw new IllegalStateException("Not supported");
+ }
+
+ @Override
+ public DispatcherType getDispatcherType() {
+ throw new IllegalStateException("Not supported");
+ }
+}