You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/03/31 12:51:22 UTC
[1/3] incubator-atlas git commit: ATLAS-577 Integrate entity audit
with DefaultMetadataService (shwethags)
Repository: incubator-atlas
Updated Branches:
refs/heads/master 985465fc7 -> bca454e16
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index e64e949..f38cffe 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -33,6 +33,8 @@ import static org.testng.Assert.assertEquals;
@Guice(modules = NotificationModule.class)
public class NotificationHookConsumerIT extends BaseResourceIT {
+ private static final String TEST_USER = "testuser";
+
@Inject
private NotificationInterface kafka;
@@ -57,7 +59,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
entity.set("name", "db" + randomString());
entity.set("description", randomString());
- sendHookMessage(new HookNotification.EntityCreateRequest(entity));
+ sendHookMessage(new HookNotification.EntityCreateRequest(TEST_USER, entity));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
@@ -79,7 +81,8 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
newEntity.set("owner", randomString());
- sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
+ sendHookMessage(
+ new HookNotification.EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE, "name", dbName, newEntity));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
@@ -105,7 +108,8 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
final String newName = "db" + randomString();
newEntity.set("name", newName);
- sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
+ sendHookMessage(
+ new HookNotification.EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE, "name", dbName, newEntity));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
@@ -135,7 +139,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
newEntity.set("owner", randomString());
//updating unique attribute
- sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity));
+ sendHookMessage(new HookNotification.EntityUpdateRequest(TEST_USER, newEntity));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java
new file mode 100644
index 0000000..9e1e08f
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationKerberosFilterIT.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.filters;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.web.security.BaseSecurityTest;
+import org.apache.atlas.web.service.EmbeddedServer;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.testng.annotations.Test;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ *
+ */
+public class AtlasAuthenticationKerberosFilterIT extends BaseSecurityTest {
+ public static final String TEST_USER_JAAS_SECTION = "TestUser";
+ public static final String TESTUSER = "testuser";
+ public static final String TESTPASS = "testpass";
+
+ private File userKeytabFile;
+ private File httpKeytabFile;
+
+ class TestEmbeddedServer extends EmbeddedServer {
+ public TestEmbeddedServer(int port, String path) throws IOException {
+ super(port, path);
+ }
+
+ Server getServer() {
+ return server;
+ }
+
+ @Override
+ protected WebAppContext getWebAppContext(String path) {
+ WebAppContext application = new WebAppContext(path, "/");
+ application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
+ application.setClassLoader(Thread.currentThread().getContextClassLoader());
+ return application;
+ }
+ }
+
+ @Test(enabled = false)
+ public void testKerberosBasedLogin() throws Exception {
+ String originalConf = System.getProperty("atlas.conf");
+ System.setProperty("atlas.conf", System.getProperty("user.dir"));
+
+ setupKDCAndPrincipals();
+ TestEmbeddedServer server = null;
+
+ try {
+ // setup the atlas-application.properties file
+ generateKerberosTestProperties();
+
+ // need to create the web application programmatically in order to control the injection of the test
+ // application properties
+ server = new TestEmbeddedServer(23000, "webapp/target/apache-atlas");
+
+ startEmbeddedServer(server.getServer());
+
+ final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
+ // attempt to hit server and get rejected
+ URL url = new URL("http://localhost:23000/");
+ HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, false);
+ connection.setRequestMethod("GET");
+ connection.connect();
+
+ assertEquals(connection.getResponseCode(), 401);
+
+ // need to populate the ticket cache with a local user, so logging in...
+ Subject subject = loginTestUser();
+
+ Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ // attempt to hit server and get rejected
+ URL url = new URL("http://localhost:23000/");
+ HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, true);
+ connection.setRequestMethod("GET");
+ connection.connect();
+
+ assertEquals(connection.getResponseCode(), 200);
+ assertEquals(RequestContext.get().getUser(), TESTUSER);
+ return null;
+ }
+ });
+ } finally {
+ server.getServer().stop();
+ kdc.stop();
+
+ if (originalConf != null) {
+ System.setProperty("atlas.conf", originalConf);
+ } else {
+ System.clearProperty("atlas.conf");
+ }
+
+ }
+
+
+ }
+
+ protected Subject loginTestUser() throws LoginException, IOException {
+ LoginContext lc = new LoginContext(TEST_USER_JAAS_SECTION, new CallbackHandler() {
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (int i = 0; i < callbacks.length; i++) {
+ if (callbacks[i] instanceof PasswordCallback) {
+ PasswordCallback passwordCallback = (PasswordCallback) callbacks[i];
+ passwordCallback.setPassword(TESTPASS.toCharArray());
+ }
+ if (callbacks[i] instanceof NameCallback) {
+ NameCallback nameCallback = (NameCallback) callbacks[i];
+ nameCallback.setName(TESTUSER);
+ }
+ }
+ }
+ });
+ // attempt authentication
+ lc.login();
+ return lc.getSubject();
+ }
+
+ protected void generateKerberosTestProperties() throws IOException, ConfigurationException {
+ Properties props = new Properties();
+ props.setProperty("atlas.http.authentication.enabled", "true");
+ props.setProperty("atlas.http.authentication.type", "kerberos");
+ props.setProperty("atlas.http.authentication.kerberos.principal", "HTTP/localhost@" + kdc.getRealm());
+ props.setProperty("atlas.http.authentication.kerberos.keytab", httpKeytabFile.getAbsolutePath());
+ props.setProperty("atlas.http.authentication.kerberos.name.rules",
+ "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT");
+
+ generateTestProperties(props);
+ }
+
+ public void setupKDCAndPrincipals() throws Exception {
+ // set up the KDC
+ File kdcWorkDir = startKDC();
+
+ userKeytabFile = createKeytab(kdc, kdcWorkDir, "dgi", "dgi.keytab");
+ httpKeytabFile = createKeytab(kdc, kdcWorkDir, "HTTP", "spnego.service.keytab");
+
+ // create a test user principal
+ kdc.createPrincipal(TESTUSER, TESTPASS);
+
+ StringBuilder jaas = new StringBuilder(1024);
+ jaas.append("TestUser {\n" +
+ " com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" +
+ "};\n");
+ jaas.append(createJAASEntry("Client", "dgi", userKeytabFile));
+ jaas.append(createJAASEntry("Server", "HTTP", httpKeytabFile));
+
+ File jaasFile = new File(kdcWorkDir, "jaas.txt");
+ FileUtils.write(jaasFile, jaas.toString());
+ bindJVMtoJAASFile(jaasFile);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java
new file mode 100644
index 0000000..ca53096
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/web/filters/AtlasAuthenticationSimpleFilterIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.filters;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.web.security.BaseSecurityTest;
+import org.apache.atlas.web.service.EmbeddedServer;
+import org.apache.commons.configuration.ConfigurationException;
+import org.eclipse.jetty.server.Server;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ *
+ */
+public class AtlasAuthenticationSimpleFilterIT extends BaseSecurityTest {
+ public static final String TESTUSER = "testuser";
+
+ class TestEmbeddedServer extends EmbeddedServer {
+ public TestEmbeddedServer(int port, String path) throws IOException {
+ super(port, path);
+ }
+
+ Server getServer() {
+ return server;
+ }
+ }
+
+ @Test(enabled = false)
+ public void testSimpleLogin() throws Exception {
+ String originalConf = System.getProperty("atlas.conf");
+ System.setProperty("atlas.conf", System.getProperty("user.dir"));
+ generateSimpleLoginConfiguration();
+
+ TestEmbeddedServer server = new TestEmbeddedServer(23001, "webapp/target/apache-atlas");
+
+ try {
+ startEmbeddedServer(server.getServer());
+
+ URL url = new URL("http://localhost:23001");
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.connect();
+
+ try {
+ assertEquals(connection.getResponseCode(), 403);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ url = new URL("http://localhost:23001/?user.name=testuser");
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.connect();
+
+ assertEquals(connection.getResponseCode(), 200);
+ assertEquals(RequestContext.get().getUser(), TESTUSER);
+ } finally {
+ server.getServer().stop();
+ if (originalConf != null) {
+ System.setProperty("atlas.conf", originalConf);
+ } else {
+ System.clearProperty("atlas.conf");
+ }
+ }
+
+
+ }
+
+ protected void generateSimpleLoginConfiguration() throws IOException, ConfigurationException {
+ Properties config = new Properties();
+ config.setProperty("atlas.http.authentication.enabled", "true");
+ config.setProperty("atlas.http.authentication.type", "simple");
+
+ generateTestProperties(config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/filters/MetadataAuthenticationKerberosFilterIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/MetadataAuthenticationKerberosFilterIT.java b/webapp/src/test/java/org/apache/atlas/web/filters/MetadataAuthenticationKerberosFilterIT.java
deleted file mode 100644
index a658497..0000000
--- a/webapp/src/test/java/org/apache/atlas/web/filters/MetadataAuthenticationKerberosFilterIT.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.web.filters;
-
-import org.apache.atlas.web.security.BaseSecurityTest;
-import org.apache.atlas.web.service.EmbeddedServer;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hdfs.web.URLConnectionFactory;
-import org.eclipse.jetty.server.Server;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-import java.io.File;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
-import java.util.Properties;
-
-/**
- *
- */
-public class MetadataAuthenticationKerberosFilterIT extends BaseSecurityTest {
- public static final String TEST_USER_JAAS_SECTION = "TestUser";
- public static final String TESTUSER = "testuser";
- public static final String TESTPASS = "testpass";
-
- private File userKeytabFile;
- private File httpKeytabFile;
-
- class TestEmbeddedServer extends EmbeddedServer {
- public TestEmbeddedServer(int port, String path) throws IOException {
- super(port, path);
- }
-
- Server getServer() {
- return server;
- }
- }
-
- @Test(enabled = false)
- public void testKerberosBasedLogin() throws Exception {
- String originalConf = System.getProperty("atlas.conf");
- System.setProperty("atlas.conf", System.getProperty("user.dir"));
-
- setupKDCAndPrincipals();
- TestEmbeddedServer server = null;
-
- try {
- // setup the atlas-application.properties file
- generateKerberosTestProperties();
-
- // need to create the web application programmatically in order to control the injection of the test
- // application properties
- server = new TestEmbeddedServer(23000, "webapp/target/apache-atlas");
-
- startEmbeddedServer(server.getServer());
-
- final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
- // attempt to hit server and get rejected
- URL url = new URL("http://localhost:23000/");
- HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, false);
- connection.setRequestMethod("GET");
- connection.connect();
-
- Assert.assertEquals(connection.getResponseCode(), 401);
-
- // need to populate the ticket cache with a local user, so logging in...
- Subject subject = loginTestUser();
-
- Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- // attempt to hit server and get rejected
- URL url = new URL("http://localhost:23000/");
- HttpURLConnection connection = (HttpURLConnection) connectionFactory.openConnection(url, true);
- connection.setRequestMethod("GET");
- connection.connect();
-
- Assert.assertEquals(connection.getResponseCode(), 200);
-
- return null;
- }
- });
- } finally {
- server.getServer().stop();
- kdc.stop();
-
- if (originalConf != null) {
- System.setProperty("atlas.conf", originalConf);
- } else {
- System.clearProperty("atlas.conf");
- }
-
- }
-
-
- }
-
- protected Subject loginTestUser() throws LoginException, IOException {
- LoginContext lc = new LoginContext(TEST_USER_JAAS_SECTION, new CallbackHandler() {
-
- @Override
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (int i = 0; i < callbacks.length; i++) {
- if (callbacks[i] instanceof PasswordCallback) {
- PasswordCallback passwordCallback = (PasswordCallback) callbacks[i];
- passwordCallback.setPassword(TESTPASS.toCharArray());
- }
- if (callbacks[i] instanceof NameCallback) {
- NameCallback nameCallback = (NameCallback) callbacks[i];
- nameCallback.setName(TESTUSER);
- }
- }
- }
- });
- // attempt authentication
- lc.login();
- return lc.getSubject();
- }
-
- protected void generateKerberosTestProperties() throws IOException, ConfigurationException {
- Properties props = new Properties();
- props.setProperty("atlas.http.authentication.enabled", "true");
- props.setProperty("atlas.http.authentication.type", "kerberos");
- props.setProperty("atlas.http.authentication.kerberos.principal", "HTTP/localhost@" + kdc.getRealm());
- props.setProperty("atlas.http.authentication.kerberos.keytab", httpKeytabFile.getAbsolutePath());
- props.setProperty("atlas.http.authentication.kerberos.name.rules",
- "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT");
-
- generateTestProperties(props);
- }
-
- public void setupKDCAndPrincipals() throws Exception {
- // set up the KDC
- File kdcWorkDir = startKDC();
-
- userKeytabFile = createKeytab(kdc, kdcWorkDir, "dgi", "dgi.keytab");
- httpKeytabFile = createKeytab(kdc, kdcWorkDir, "HTTP", "spnego.service.keytab");
-
- // create a test user principal
- kdc.createPrincipal(TESTUSER, TESTPASS);
-
- StringBuilder jaas = new StringBuilder(1024);
- jaas.append("TestUser {\n" +
- " com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" +
- "};\n");
- jaas.append(createJAASEntry("Client", "dgi", userKeytabFile));
- jaas.append(createJAASEntry("Server", "HTTP", httpKeytabFile));
-
- File jaasFile = new File(kdcWorkDir, "jaas.txt");
- FileUtils.write(jaasFile, jaas.toString());
- bindJVMtoJAASFile(jaasFile);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/filters/MetadataAuthenticationSimpleFilterIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/MetadataAuthenticationSimpleFilterIT.java b/webapp/src/test/java/org/apache/atlas/web/filters/MetadataAuthenticationSimpleFilterIT.java
deleted file mode 100644
index 77659ee..0000000
--- a/webapp/src/test/java/org/apache/atlas/web/filters/MetadataAuthenticationSimpleFilterIT.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.web.filters;
-
-import org.apache.atlas.web.security.BaseSecurityTest;
-import org.apache.atlas.web.service.EmbeddedServer;
-import org.apache.commons.configuration.ConfigurationException;
-import org.eclipse.jetty.server.Server;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Properties;
-
-/**
- *
- */
-public class MetadataAuthenticationSimpleFilterIT extends BaseSecurityTest {
-
- class TestEmbeddedServer extends EmbeddedServer {
- public TestEmbeddedServer(int port, String path) throws IOException {
- super(port, path);
- }
-
- Server getServer() {
- return server;
- }
- }
-
- @Test(enabled = false)
- public void testSimpleLogin() throws Exception {
- String originalConf = System.getProperty("atlas.conf");
- System.setProperty("atlas.conf", System.getProperty("user.dir"));
- generateSimpleLoginConfiguration();
-
- TestEmbeddedServer server = new TestEmbeddedServer(23001, "webapp/target/apache-atlas");
-
- try {
- startEmbeddedServer(server.getServer());
-
- URL url = new URL("http://localhost:23001");
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- connection.setRequestMethod("GET");
- connection.connect();
-
- try {
- Assert.assertEquals(connection.getResponseCode(), 403);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- url = new URL("http://localhost:23001/?user.name=testuser");
- connection = (HttpURLConnection) url.openConnection();
- connection.setRequestMethod("GET");
- connection.connect();
-
- Assert.assertEquals(connection.getResponseCode(), 200);
- } finally {
- server.getServer().stop();
- if (originalConf != null) {
- System.setProperty("atlas.conf", originalConf);
- } else {
- System.clearProperty("atlas.conf");
- }
- }
-
-
- }
-
- protected void generateSimpleLoginConfiguration() throws IOException, ConfigurationException {
- Properties config = new Properties();
- config.setProperty("atlas.http.authentication.enabled", "true");
- config.setProperty("atlas.http.authentication.type", "simple");
-
- generateTestProperties(config);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java b/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java
index a4f8cce..08bb125 100644
--- a/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java
+++ b/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java
@@ -17,6 +17,7 @@
package org.apache.atlas.web.listeners;
import com.google.inject.Key;
+import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.thinkaurelius.titan.core.TitanGraph;
@@ -60,6 +61,11 @@ public class TestGuiceServletConfig extends GuiceServletConfig {
}
@Override
+ protected Module getRepositoryModule() {
+ return new TestModule();
+ }
+
+ @Override
protected void startServices() {
try {
Configuration conf = ApplicationProperties.get();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/listeners/TestModule.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/listeners/TestModule.java b/webapp/src/test/java/org/apache/atlas/web/listeners/TestModule.java
new file mode 100644
index 0000000..9cb76d3
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/web/listeners/TestModule.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.web.listeners;
+
+import com.google.inject.Binder;
+import org.apache.atlas.RepositoryMetadataModule;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.audit.InMemoryEntityAuditRepository;
+
+public class TestModule extends RepositoryMetadataModule {
+ @Override
+ protected void bindAuditRepository(Binder binder) {
+ //Map EntityAuditRepository interface to hbase based implementation
+ binder.bind(EntityAuditRepository.class).to(InMemoryEntityAuditRepository.class).asEagerSingleton();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java b/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
index 08f4b68..d497230 100644
--- a/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/BaseSSLAndKerberosTest.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.webapp.WebAppContext;
import java.io.File;
import java.io.IOException;
@@ -51,8 +52,11 @@ public class BaseSSLAndKerberosTest extends BaseSecurityTest {
}
@Override
- public org.apache.commons.configuration.Configuration getConfiguration() {
- return super.getConfiguration();
+ protected WebAppContext getWebAppContext(String path) {
+ WebAppContext application = new WebAppContext(path, "/");
+ application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
+ application.setClassLoader(Thread.currentThread().getContextClassLoader());
+ return application;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
index e010537..270a20d 100644
--- a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
@@ -32,11 +32,15 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
+import java.net.URL;
import java.nio.file.Files;
import java.util.Locale;
import java.util.Properties;
-import static org.apache.atlas.security.SecurityProperties.*;
+import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
+import static org.apache.atlas.security.SecurityProperties.KEYSTORE_FILE_KEY;
+import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
+import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_FILE_KEY;
/**
*
@@ -135,4 +139,23 @@ public class BaseSecurityTest {
return configuration;
}
+ public static String writeConfiguration(final PropertiesConfiguration configuration) throws Exception {
+ String persistDir = TestUtils.getTempDirectory();
+ TestUtils.writeConfiguration(configuration, persistDir + File.separator +
+ ApplicationProperties.APPLICATION_PROPERTIES);
+
+ String confLocation = System.getProperty("atlas.conf");
+ URL url;
+ if (confLocation == null) {
+ url = BaseSecurityTest.class.getResource("/" + ApplicationProperties.APPLICATION_PROPERTIES);
+ } else {
+ url = new File(confLocation, ApplicationProperties.APPLICATION_PROPERTIES).toURI().toURL();
+ }
+ PropertiesConfiguration configuredProperties = new PropertiesConfiguration();
+ configuredProperties.load(url);
+ TestUtils.writeConfiguration(configuredProperties, persistDir + File.separator +
+ ApplicationProperties.APPLICATION_PROPERTIES);
+ ApplicationProperties.forceReload();
+ return persistDir;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/security/NegativeSSLAndKerberosTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/NegativeSSLAndKerberosTest.java b/webapp/src/test/java/org/apache/atlas/web/security/NegativeSSLAndKerberosTest.java
index 8eb8094..07802fa 100755
--- a/webapp/src/test/java/org/apache/atlas/web/security/NegativeSSLAndKerberosTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/NegativeSSLAndKerberosTest.java
@@ -22,6 +22,7 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.web.TestUtils;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
@@ -95,7 +96,7 @@ public class NegativeSSLAndKerberosTest extends BaseSSLAndKerberosTest {
System.setProperty("atlas.conf", persistDir);
secureEmbeddedServer = new TestSecureEmbeddedServer(21443, getWarPath()) {
@Override
- public PropertiesConfiguration getConfiguration() {
+ public Configuration getConfiguration() {
return configuration;
}
};
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java b/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
index aad3f3b..480861e 100755
--- a/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/SSLTest.java
@@ -18,10 +18,8 @@
package org.apache.atlas.web.security;
-import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
-import org.apache.atlas.web.TestUtils;
import org.apache.atlas.web.service.SecureEmbeddedServer;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
@@ -30,6 +28,7 @@ import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.webapp.WebAppContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -47,6 +46,7 @@ public class SSLTest extends BaseSSLAndKerberosTest {
private Path jksPath;
private String providerUrl;
private TestSecureEmbeddedServer secureEmbeddedServer;
+ private String originalConf;
class TestSecureEmbeddedServer extends SecureEmbeddedServer {
@@ -59,8 +59,11 @@ public class SSLTest extends BaseSSLAndKerberosTest {
}
@Override
- public org.apache.commons.configuration.Configuration getConfiguration() {
- return super.getConfiguration();
+ protected WebAppContext getWebAppContext(String path) {
+ WebAppContext application = new WebAppContext(path, "/");
+ application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
+ application.setClassLoader(Thread.currentThread().getContextClassLoader());
+ return application;
}
}
@@ -69,13 +72,9 @@ public class SSLTest extends BaseSSLAndKerberosTest {
jksPath = new Path(Files.createTempDirectory("tempproviders").toString(), "test.jks");
providerUrl = JavaKeyStoreProvider.SCHEME_NAME + "://file/" + jksPath.toUri();
- String persistDir = TestUtils.getTempDirectory();
-
setupCredentials();
-
final PropertiesConfiguration configuration = getSSLConfiguration(providerUrl);
- TestUtils.writeConfiguration(configuration, persistDir + File.separator +
- ApplicationProperties.APPLICATION_PROPERTIES);
+ String persistDir = writeConfiguration(configuration);
dgiCLient = new AtlasClient(DGI_URL) {
@Override
@@ -84,6 +83,8 @@ public class SSLTest extends BaseSSLAndKerberosTest {
}
};
+ originalConf = System.getProperty("atlas.conf");
+ System.setProperty("atlas.conf", persistDir);
secureEmbeddedServer = new TestSecureEmbeddedServer(21443, getWarPath()) {
@Override
public PropertiesConfiguration getConfiguration() {
@@ -98,6 +99,10 @@ public class SSLTest extends BaseSSLAndKerberosTest {
if (secureEmbeddedServer != null) {
secureEmbeddedServer.getServer().stop();
}
+
+ if (originalConf != null) {
+ System.setProperty("atlas.conf", originalConf);
+ }
}
protected void setupCredentials() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTest.java b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTest.java
index 6b0a005..c356be6 100644
--- a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTest.java
@@ -18,8 +18,11 @@
package org.apache.atlas.web.service;
+import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.web.TestUtils;
+import org.apache.atlas.web.security.BaseSecurityTest;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.eclipse.jetty.webapp.WebAppContext;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -34,10 +37,16 @@ public class SecureEmbeddedServerTest extends SecureEmbeddedServerTestBase {
// setup the configuration
final PropertiesConfiguration configuration = new PropertiesConfiguration();
configuration.setProperty(CERT_STORES_CREDENTIAL_PROVIDER_PATH, providerUrl);
+ configuration.setProperty("atlas.services.enabled", false);
configuration.setProperty("atlas.notification.embedded", "false");
// setup the credential provider
setupCredentials();
+ String persistDir = BaseSecurityTest.writeConfiguration(configuration);
+ String originalConf = System.getProperty("atlas.conf");
+ System.setProperty("atlas.conf", persistDir);
+
+ ApplicationProperties.forceReload();
SecureEmbeddedServer secureEmbeddedServer = null;
try {
secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
@@ -45,6 +54,16 @@ public class SecureEmbeddedServerTest extends SecureEmbeddedServerTestBase {
protected PropertiesConfiguration getConfiguration() {
return configuration;
}
+
+ @Override
+ protected WebAppContext getWebAppContext(String path) {
+ WebAppContext application = new WebAppContext(path, "/");
+ application.setDescriptor(
+ System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
+ application.setClassLoader(Thread.currentThread().getContextClassLoader());
+ return application;
+ }
+
};
secureEmbeddedServer.server.start();
@@ -59,7 +78,12 @@ public class SecureEmbeddedServerTest extends SecureEmbeddedServerTestBase {
Assert.fail("War deploy failed", e);
} finally {
secureEmbeddedServer.server.stop();
+
+ if (originalConf == null) {
+ System.clearProperty("atlas.conf");
+ } else {
+ System.setProperty("atlas.conf", originalConf);
+ }
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTestBase.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTestBase.java b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTestBase.java
index 6a56c5d..b42fc0f 100755
--- a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTestBase.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerTestBase.java
@@ -103,8 +103,11 @@ public class SecureEmbeddedServerTestBase {
@Test
public void testNoConfiguredCredentialProvider() throws Exception {
-
+ String originalConf = null;
try {
+ originalConf = System.getProperty("atlas.conf");
+ System.clearProperty("atlas.conf");
+ ApplicationProperties.forceReload();
secureEmbeddedServer = new SecureEmbeddedServer(securePort, TestUtils.getWarPath());
secureEmbeddedServer.server.start();
@@ -113,7 +116,15 @@ public class SecureEmbeddedServerTestBase {
Assert.assertEquals(e.getMessage(),
"No credential provider path configured for storage of certificate store passwords");
} finally {
- secureEmbeddedServer.server.stop();
+ if (secureEmbeddedServer != null) {
+ secureEmbeddedServer.server.stop();
+ }
+
+ if (originalConf == null) {
+ System.clearProperty("atlas.conf");
+ } else {
+ System.setProperty("atlas.conf", originalConf);
+ }
}
}
[3/3] incubator-atlas git commit: ATLAS-577 Integrate entity audit
with DefaultMetadataService (shwethags)
Posted by sh...@apache.org.
ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bca454e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bca454e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bca454e1
Branch: refs/heads/master
Commit: bca454e16f0b289b39ab75986e6acdca49488d04
Parents: 985465f
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Thu Mar 31 14:49:12 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Thu Mar 31 14:49:12 2016 +0530
----------------------------------------------------------------------
addons/falcon-bridge/pom.xml | 9 +-
.../apache/atlas/falcon/hook/FalconHook.java | 97 ++--------
.../atlas/publisher/FalconEventPublisher.java | 6 +-
.../apache/atlas/falcon/hook/FalconHookIT.java | 56 +++++-
addons/hive-bridge/pom.xml | 9 +-
.../org/apache/atlas/hive/hook/HiveHook.java | 33 ++--
addons/sqoop-bridge/pom.xml | 9 +-
.../org/apache/atlas/sqoop/hook/SqoopHook.java | 73 +------
.../apache/atlas/sqoop/hook/SqoopHookIT.java | 30 ++-
addons/storm-bridge/pom.xml | 9 +-
.../apache/atlas/storm/hook/StormAtlasHook.java | 53 +-----
.../atlas/storm/hook/StormAtlasHookIT.java | 39 +++-
.../atlas/storm/hook/StormAtlasHookTest.java | 68 -------
.../org/apache/atlas/ApplicationProperties.java | 12 +-
distro/src/conf/atlas-log4j.xml | 2 +-
.../java/org/apache/atlas/hook/AtlasHook.java | 85 ++++++---
.../notification/NotificationHookConsumer.java | 23 +--
.../atlas/notification/NotificationModule.java | 7 +
.../NotificationEntityChangeListener.java | 2 +
.../notification/hook/HookNotification.java | 88 +++++----
.../NotificationHookConsumerTest.java | 37 +++-
.../notification/hook/HookNotificationTest.java | 69 ++++---
pom.xml | 5 +-
release-log.txt | 1 +
repository/pom.xml | 1 +
.../apache/atlas/RepositoryMetadataModule.java | 28 ++-
.../repository/audit/EntityAuditListener.java | 95 ++++++++++
.../repository/audit/EntityAuditRepository.java | 37 +++-
.../audit/HBaseBasedAuditRepository.java | 25 ++-
.../audit/InMemoryEntityAuditRepository.java | 59 ++++++
.../graph/GraphBackedSearchIndexer.java | 112 ++++++-----
.../atlas/services/DefaultMetadataService.java | 61 +++---
.../GraphBackedDiscoveryServiceTest.java | 7 -
.../audit/AuditRepositoryTestBase.java | 81 ++++++++
.../audit/HBaseBasedAuditRepositoryTest.java | 88 +--------
.../atlas/repository/audit/HBaseTestUtils.java | 57 ++++++
.../audit/InMemoryAuditRepositoryTest.java | 28 +++
.../service/DefaultMetadataServiceTest.java | 57 +++++-
.../DefaultMetadataServiceMockTest.java | 7 +-
server-api/pom.xml | 1 -
.../java/org/apache/atlas/RequestContext.java | 55 ++++++
.../atlas/typesystem/types/TypeSystem.java | 2 -
.../typesystem/types/TypeSystemProvider.java | 28 +++
.../main/resources/atlas-application.properties | 8 +
webapp/pom.xml | 4 +-
.../web/filters/AtlasAuthenticationFilter.java | 79 +++++++-
.../apache/atlas/web/filters/AuditFilter.java | 10 +-
.../atlas/web/listeners/GuiceServletConfig.java | 33 ++--
.../atlas/web/service/EmbeddedServer.java | 7 +-
.../NotificationHookConsumerIT.java | 12 +-
.../AtlasAuthenticationKerberosFilterIT.java | 190 +++++++++++++++++++
.../AtlasAuthenticationSimpleFilterIT.java | 98 ++++++++++
.../MetadataAuthenticationKerberosFilterIT.java | 179 -----------------
.../MetadataAuthenticationSimpleFilterIT.java | 94 ---------
.../web/listeners/TestGuiceServletConfig.java | 6 +
.../apache/atlas/web/listeners/TestModule.java | 32 ++++
.../web/security/BaseSSLAndKerberosTest.java | 8 +-
.../atlas/web/security/BaseSecurityTest.java | 25 ++-
.../security/NegativeSSLAndKerberosTest.java | 3 +-
.../org/apache/atlas/web/security/SSLTest.java | 23 ++-
.../web/service/SecureEmbeddedServerTest.java | 26 ++-
.../service/SecureEmbeddedServerTestBase.java | 15 +-
62 files changed, 1558 insertions(+), 945 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index ccdb512..ad345c5 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -205,13 +205,18 @@
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
- <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
- <value>atlas-log4j.xml</value>
+ <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.log.file</name>
+ <value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index d4b0069..c1ab384 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -20,22 +20,17 @@ package org.apache.atlas.falcon.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
-import com.google.inject.Inject;
import com.google.inject.Injector;
-import com.sun.jersey.api.client.ClientResponse;
-import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.atlas.Util.EventUtil;
import org.apache.falcon.atlas.event.FalconEvent;
@@ -50,8 +45,7 @@ import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.falcon.security.CurrentUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +59,7 @@ import java.util.concurrent.TimeUnit;
/**
* Falcon hook sends lineage information to the Atlas Service.
*/
-public class FalconHook extends FalconEventPublisher {
+public class FalconHook extends AtlasHook implements FalconEventPublisher {
private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class);
public static final String CONF_PREFIX = "atlas.hook.falcon.";
@@ -77,10 +71,6 @@ public class FalconHook extends FalconEventPublisher {
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
- public static final String ATLAS_ENDPOINT = "atlas.rest.address";
-
- private static AtlasClient atlasClient;
-
// wait time determines how long we wait before we exit the jvm on
// shutdown. Pending requests after that will not be sent.
private static final int WAIT_TIME = 3;
@@ -91,20 +81,12 @@ public class FalconHook extends FalconEventPublisher {
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
- private static Configuration atlasProperties;
- @Inject
- private static NotificationInterface notifInterface;
-
- public static boolean typesRegistered = false;
-
private static boolean sync;
private static ConfigurationStore STORE;
static {
try {
- atlasProperties = ApplicationProperties.get();
-
// initialize the async facility to process hook calls. We don't
// want to do this inline since it adds plenty of overhead for the query.
int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
@@ -130,8 +112,6 @@ public class FalconHook extends FalconEventPublisher {
// shutdown client
}
});
- atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT),
- EventUtil.getUgi(), EventUtil.getUgi().getShortUserName());
STORE = ConfigurationStore.get();
} catch (Exception e) {
@@ -166,7 +146,17 @@ public class FalconHook extends FalconEventPublisher {
private void fireAndForget(FalconEvent event) throws Exception {
LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation());
- notifyEntity(createEntities(event));
+ notifyEntities(getAuthenticatedUser(), createEntities(event));
+ }
+
+ private String getAuthenticatedUser() {
+ String user = null;
+ try {
+ user = CurrentUser.getAuthenticatedUser();
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Failed to get user from CurrentUser.getAuthenticatedUser");
+ }
+ return getUser(user, null);
}
private List<Referenceable> createEntities(FalconEvent event) throws Exception {
@@ -179,36 +169,6 @@ public class FalconHook extends FalconEventPublisher {
}
/**
- * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
- * De-duping of entities is done on server side depending on the unique attribute on the
- *
- * @param entities entitiies to add
- */
- private void notifyEntity(List<Referenceable> entities) {
- int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
- String message = entities.toString();
-
- int numRetries = 0;
- while (true) {
- try {
- notifInterface.send(NotificationInterface.NotificationType.HOOK,
- new HookNotification.EntityCreateRequest(entities));
- return;
- } catch (Exception e) {
- numRetries++;
- if (numRetries < maxRetries) {
- LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
- } else {
- LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message,
- maxRetries, e);
- break;
- }
- }
- }
- }
-
-
- /**
+ * Creates process entity
+ *
+ * @param event process entity event
@@ -324,32 +284,9 @@ public class FalconHook extends FalconEventPublisher {
return entities;
}
- public synchronized void registerFalconDataModel() throws Exception {
- if (isDataModelAlreadyRegistered()) {
- LOG.info("Falcon data model is already registered!");
- return;
- }
-
- HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties,
- UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
- hiveMetaStoreBridge.registerHiveDataModel();
-
- FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator();
- LOG.info("Registering Falcon data model");
- atlasClient.createType(dataModelGenerator.getModelAsJson());
- }
-
- private boolean isDataModelAlreadyRegistered() throws Exception {
- try {
- atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
- LOG.info("Hive data model is already registered!");
- return true;
- } catch(AtlasServiceException ase) {
- if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
- return false;
- }
- throw ase;
- }
+ @Override
+ protected String getNumberOfRetriesPropertyKey() {
+ return HOOK_NUM_RETRIES;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
index 3522339..8029be9 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
@@ -24,8 +24,8 @@ import org.apache.falcon.atlas.event.FalconEvent;
/**
* Falcon publisher for Atlas
*/
-public abstract class FalconEventPublisher {
- public static class Data {
+public interface FalconEventPublisher {
+ class Data {
private FalconEvent event;
public Data(FalconEvent event) {
@@ -37,5 +37,5 @@ public abstract class FalconEventPublisher {
}
}
- public abstract void publish(final Data data) throws Exception;
+ void publish(final Data data) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
index aaffa4a..4249a8f 100644
--- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
+++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
@@ -18,12 +18,16 @@
package org.apache.atlas.falcon.hook;
+import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.falcon.atlas.service.AtlasService;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -33,6 +37,8 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
@@ -54,21 +60,51 @@ public class FalconHookIT {
public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
public static final String PROCESS_RESOURCE = "/process.xml";
- private AtlasClient dgiCLient;
+ private AtlasClient atlasClient;
private static final ConfigurationStore STORE = ConfigurationStore.get();
+ private Configuration atlasProperties;
@BeforeClass
public void setUp() throws Exception {
- dgiCLient = new AtlasClient(ApplicationProperties.get().getString("atlas.rest.address"));
+ atlasProperties = ApplicationProperties.get();
+ atlasClient = new AtlasClient(atlasProperties.getString("atlas.rest.address"));
AtlasService service = new AtlasService();
service.init();
STORE.registerListener(service);
- new FalconHook().registerFalconDataModel();
+ registerFalconDataModel();
CurrentUser.authenticate(System.getProperty("user.name"));
}
+ private void registerFalconDataModel() throws Exception {
+ if (isDataModelAlreadyRegistered()) {
+ LOG.info("Falcon data model is already registered!");
+ return;
+ }
+
+ HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties,
+ UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
+ hiveMetaStoreBridge.registerHiveDataModel();
+
+ FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator();
+ LOG.info("Registering Falcon data model");
+ atlasClient.createType(dataModelGenerator.getModelAsJson());
+ }
+
+ private boolean isDataModelAlreadyRegistered() throws Exception {
+ try {
+ atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
+ LOG.info("Hive data model is already registered!");
+ return true;
+ } catch(AtlasServiceException ase) {
+ if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+ return false;
+ }
+ throw ase;
+ }
+ }
+
private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException {
Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource));
switch (entity.getEntityType()) {
@@ -115,17 +151,17 @@ public class FalconHookIT {
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
- Referenceable processEntity = dgiCLient.getEntity(pid);
+ Referenceable processEntity = atlasClient.getEntity(pid);
assertNotNull(processEntity);
assertEquals(processEntity.get("processName"), process.getName());
Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
- Referenceable inEntity = dgiCLient.getEntity(inId._getId());
+ Referenceable inEntity = atlasClient.getEntity(inId._getId());
assertEquals(inEntity.get("name"),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), inDbName, inTableName));
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
- Referenceable outEntity = dgiCLient.getEntity(outId._getId());
+ Referenceable outEntity = atlasClient.getEntity(outId._getId());
assertEquals(outEntity.get("name"),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
}
@@ -173,12 +209,12 @@ public class FalconHookIT {
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
- Referenceable processEntity = dgiCLient.getEntity(pid);
+ Referenceable processEntity = atlasClient.getEntity(pid);
assertEquals(processEntity.get("processName"), process.getName());
assertNull(processEntity.get("inputs"));
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
- Referenceable outEntity = dgiCLient.getEntity(outId._getId());
+ Referenceable outEntity = atlasClient.getEntity(outId._getId());
assertEquals(outEntity.get("name"),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
}
@@ -209,13 +245,13 @@ public class FalconHookIT {
waitFor(2000000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
- JSONArray results = dgiCLient.search(query);
+ JSONArray results = atlasClient.search(query);
System.out.println(results);
return results.length() == 1;
}
});
- JSONArray results = dgiCLient.search(query);
+ JSONArray results = atlasClient.search(query);
JSONObject row = results.getJSONObject(0).getJSONObject("t");
return row.getString("id");
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index adb4f3a..8bfbb13 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -283,13 +283,18 @@
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
- <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
- <value>atlas-log4j.xml</value>
+ <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.log.file</name>
+ <value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 16ed452..f313f2e 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -20,14 +20,12 @@ package org.apache.atlas.hive.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -86,8 +84,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
- private static Configuration atlasProperties;
-
class HiveEvent {
public Set<ReadEntity> inputs;
public Set<WriteEntity> outputs;
@@ -108,8 +104,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
static {
try {
- atlasProperties = ApplicationProperties.get();
-
// initialize the async facility to process hook calls. We don't
// want to do this inline since it adds plenty of overhead for the query.
int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
@@ -166,7 +160,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
event.inputs = hookContext.getInputs();
event.outputs = hookContext.getOutputs();
- event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName();
+ event.user = getUser(hookContext.getUserName(), hookContext.getUgi());
event.ugi = hookContext.getUgi();
event.operation = OPERATION_MAP.get(hookContext.getOperationName());
event.hookType = hookContext.getHookType();
@@ -258,7 +252,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
for (WriteEntity writeEntity : event.outputs) {
if (writeEntity.getType() == Type.DATABASE) {
//Create/update table entity
- createOrUpdateEntities(dgiBridge, writeEntity);
+ createOrUpdateEntities(dgiBridge, event.user, writeEntity);
}
}
}
@@ -271,7 +265,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//Below check should filter out partition related
if (writeEntity.getType() == Entity.Type.TABLE) {
//Create/update table entity
- createOrUpdateEntities(dgiBridge, writeEntity);
+ createOrUpdateEntities(dgiBridge, event.user, writeEntity);
}
}
}
@@ -292,7 +286,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
.equals(oldTable.getTableName())) {
//Create/update old table entity - create new entity and replace id
- Referenceable tableEntity = createOrUpdateEntities(dgiBridge, writeEntity);
+ Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
oldTable.getDbName(), oldTable.getTableName());
tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
@@ -304,14 +298,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName);
newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase());
- messages.add(new HookNotification.EntityPartialUpdateRequest(HiveDataTypes.HIVE_TABLE.getName(),
- HiveDataModelGenerator.NAME, oldQualifiedName, newEntity));
+ messages.add(new HookNotification.EntityPartialUpdateRequest(event.user,
+ HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME,
+ oldQualifiedName, newEntity));
}
}
}
}
- private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception {
+ private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity) throws Exception {
Database db = null;
Table table = null;
Partition partition = null;
@@ -351,14 +346,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
entities.add(partitionEntity);
}
- messages.add(new HookNotification.EntityUpdateRequest(entities));
+ messages.add(new HookNotification.EntityUpdateRequest(user, entities));
return tableEntity;
}
private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception {
for (WriteEntity entity : event.outputs) {
if (entity.getType() == entityType) {
- createOrUpdateEntities(dgiBridge, entity);
+ createOrUpdateEntities(dgiBridge, event.user, entity);
}
}
}
@@ -396,7 +391,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
- Referenceable inTable = createOrUpdateEntities(dgiBridge, readEntity);
+ Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity);
source.add(inTable);
}
}
@@ -405,7 +400,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
- Referenceable outTable = createOrUpdateEntities(dgiBridge, writeEntity);
+ Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
target.add(outTable);
}
}
@@ -417,7 +412,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//TODO set
processReferenceable.set("queryGraph", "queryGraph");
- messages.add(new HookNotification.EntityCreateRequest(processReferenceable));
+ messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable));
}
@@ -432,6 +427,4 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return new JSONObject();
}
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/sqoop-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index 0927c8f..343bb4e 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -288,13 +288,18 @@
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
- <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
- <value>atlas-log4j.xml</value>
+ <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.log.file</name>
+ <value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index b573ac4..924e467 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -19,31 +19,24 @@
package org.apache.atlas.sqoop.hook;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.sqoop.model.SqoopDataModelGenerator;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.SqoopJobDataPublisher;
import org.apache.sqoop.util.ImportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -55,43 +48,16 @@ import java.util.Properties;
public class SqoopHook extends SqoopJobDataPublisher {
private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
- private static final String DEFAULT_DGI_URL = "http://localhost:21000/";
public static final String CONF_PREFIX = "atlas.hook.sqoop.";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
- public static final String ATLAS_REST_ADDRESS = "atlas.rest.address";
-
- @Inject
- private static NotificationInterface notifInterface;
static {
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
}
- synchronized void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception {
- // Make sure hive model exists
- HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf,
- UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
- hiveMetaStoreBridge.registerHiveDataModel();
- SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
-
- //Register sqoop data model if its not already registered
- try {
- client.getType(SqoopDataTypes.SQOOP_PROCESS.getName());
- LOG.info("Sqoop data model is already registered!");
- } catch(AtlasServiceException ase) {
- if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
- //Expected in case types do not exist
- LOG.info("Registering Sqoop data model");
- client.createType(dataModelGenerator.getModelAsJson());
- } else {
- throw ase;
- }
- }
- }
-
public Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
@@ -182,12 +148,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
@Override
public void publish(SqoopJobDataPublisher.Data data) throws Exception {
- Injector injector = Guice.createInjector(new NotificationModule());
- notifInterface = injector.getInstance(NotificationInterface.class);
-
Configuration atlasProperties = ApplicationProperties.get();
- AtlasClient atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_REST_ADDRESS, DEFAULT_DGI_URL),
- UserGroupInformation.getCurrentUser(), UserGroupInformation.getCurrentUser().getShortUserName());
org.apache.hadoop.conf.Configuration sqoopConf = new org.apache.hadoop.conf.Configuration();
String clusterName = sqoopConf.get(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
@@ -197,33 +158,9 @@ public class SqoopHook extends SqoopJobDataPublisher {
data.getHiveTable(), data.getHiveDB());
Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName);
- notifyEntity(atlasProperties, dbStoreRef, dbRef, hiveTableRef, procRef);
- }
-
- /**
- * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
- * De-duping of entities is done on server side depending on the unique attribute on the
- * @param entities - Entity references to publish.
- */
- private void notifyEntity(Configuration atlasProperties, Referenceable... entities) {
int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-
- int numRetries = 0;
- while (true) {
- try {
- notifInterface.send(NotificationInterface.NotificationType.HOOK,
- new HookNotification.EntityCreateRequest(entities));
- return;
- } catch(Exception e) {
- numRetries++;
- if(numRetries < maxRetries) {
- LOG.debug("Failed to notify atlas for entity {}. Retrying", entities, e);
- } else {
- LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", entities,
- maxRetries, e);
- break;
- }
- }
- }
+ HookNotification.HookNotificationMessage message =
+ new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef);
+ AtlasHook.notifyEntities(Arrays.asList(message), maxRetries);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
index 94cd105..0e4658a 100644
--- a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
+++ b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
@@ -18,11 +18,17 @@
package org.apache.atlas.sqoop.hook;
+import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.sqoop.model.SqoopDataModelGenerator;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.SqoopJobDataPublisher;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
@@ -44,7 +50,29 @@ public class SqoopHookIT {
//Set-up sqoop session
Configuration configuration = ApplicationProperties.get();
dgiCLient = new AtlasClient(configuration.getString("atlas.rest.address"));
- new SqoopHook().registerDataModels(dgiCLient, configuration);
+ registerDataModels(dgiCLient, configuration);
+ }
+
+ private void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception {
+ // Make sure hive model exists
+ HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf,
+ UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
+ hiveMetaStoreBridge.registerHiveDataModel();
+ SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
+
+ //Register sqoop data model if its not already registered
+ try {
+ client.getType(SqoopDataTypes.SQOOP_PROCESS.getName());
+ LOG.info("Sqoop data model is already registered!");
+ } catch(AtlasServiceException ase) {
+ if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+ //Expected in case types do not exist
+ LOG.info("Registering Sqoop data model");
+ client.createType(dataModelGenerator.getModelAsJson());
+ } else {
+ throw ase;
+ }
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index 76c4507..e3b4ed7 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -318,13 +318,18 @@
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
- <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
- <value>atlas-log4j.xml</value>
+ <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.log.file</name>
+ <value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 4c0004b..620f929 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -24,20 +24,13 @@ import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.utils.Utils;
-import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
-import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
-import org.apache.atlas.storm.model.StormDataModel;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.TypesDef;
-import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -70,15 +63,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
public static final String HBASE_NAMESPACE_DEFAULT = "default";
- private static volatile boolean typesRegistered = false;
-
- public StormAtlasHook() {
- super();
- }
-
- StormAtlasHook(AtlasClient atlasClient) {
- super(atlasClient);
- }
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
@@ -113,7 +97,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
entities.add(topologyReferenceable);
LOG.debug("notifying entities, size = {}", entities.size());
- notifyEntities(entities);
+ String user = getUser(topologyInfo.get_owner(), null);
+ notifyEntities(user, entities);
} catch (Exception e) {
throw new RuntimeException("Atlas hook is unable to process the topology.", e);
}
@@ -379,38 +364,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
return String.format("%s.%s@%s", nameSpace, tableName, clusterName);
}
- public synchronized void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException,
- AtlasServiceException {
-
- try {
- atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
- LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model");
- } catch(AtlasServiceException ase) {
- if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
- //Expected in case types do not exist
- LOG.info("Registering Hive data model");
- atlasClient.createType(dataModelGenerator.getModelAsJson());
- } else {
- throw ase;
- }
- }
-
-
- try {
- atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName());
- } catch(AtlasServiceException ase) {
- if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
- LOG.info("Registering Storm/Kafka data model");
- StormDataModel.main(new String[]{});
- TypesDef typesDef = StormDataModel.typesDef();
- String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
- LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
- atlasClient.createType(stormTypesAsJSON);
- }
- }
- typesRegistered = true;
- }
-
private String getClusterName(Map stormConf) {
String clusterName = AtlasConstants.DEFAULT_CLUSTER_NAME;
if (stormConf.containsKey(AtlasConstants.CLUSTER_NAME_KEY)) {
@@ -418,6 +371,4 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
}
return clusterName;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
index 79f1b07..4648d24 100644
--- a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
+++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java
@@ -20,9 +20,13 @@ package org.apache.atlas.storm.hook;
import backtype.storm.ILocalCluster;
import backtype.storm.generated.StormTopology;
+import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
+import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.storm.model.StormDataModel;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable;
@@ -57,9 +61,40 @@ public class StormAtlasHookIT {
Configuration configuration = ApplicationProperties.get();
atlasClient = new AtlasClient(configuration.getString("atlas.rest.address", ATLAS_URL));
- new StormAtlasHook().registerDataModel(new HiveDataModelGenerator());
+ registerDataModel(new HiveDataModelGenerator());
}
+ private void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException,
+ AtlasServiceException {
+ try {
+ atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
+ LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model");
+ } catch(AtlasServiceException ase) {
+ if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+ //Expected in case types do not exist
+ LOG.info("Registering Hive data model");
+ atlasClient.createType(dataModelGenerator.getModelAsJson());
+ } else {
+ throw ase;
+ }
+ }
+
+
+ try {
+ atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName());
+ } catch(AtlasServiceException ase) {
+ if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+ LOG.info("Registering Storm/Kafka data model");
+ StormDataModel.main(new String[]{});
+ TypesDef typesDef = StormDataModel.typesDef();
+ String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
+ LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
+ atlasClient.createType(stormTypesAsJSON);
+ }
+ }
+ }
+
+
@AfterClass
public void tearDown() throws Exception {
LOG.info("Shutting down storm local cluster");
@@ -76,7 +111,7 @@ public class StormAtlasHookIT {
String stormTypesAsJSON = TypesSerialization.toJson(stormTypesDef);
LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
- new StormAtlasHook().registerDataModel(new HiveDataModelGenerator());
+ registerDataModel(new HiveDataModelGenerator());
// verify types are registered
for (StormDataTypes stormDataType : StormDataTypes.values()) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java
deleted file mode 100644
index 51840a5..0000000
--- a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.storm.hook;
-
-import com.sun.jersey.api.client.ClientResponse;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.hive.model.HiveDataModelGenerator;
-import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.storm.model.StormDataTypes;
-import org.testng.annotations.Test;
-
-import static org.mockito.Matchers.contains;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@Test
-public class StormAtlasHookTest {
-
- @Test
- public void testStormRegistersHiveDataModelIfNotPresent() throws AtlasException, AtlasServiceException {
- AtlasClient atlasClient = mock(AtlasClient.class);
- HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
- AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
- when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
- when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenThrow(atlasServiceException);
- String hiveModel = "{hive_model_as_json}";
- when(dataModelGenerator.getModelAsJson()).thenReturn(hiveModel);
-
- StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
- stormAtlasHook.registerDataModel(dataModelGenerator);
-
- verify(atlasClient).createType(hiveModel);
- }
-
- @Test
- public void testStormRegistersStormModelIfNotPresent() throws AtlasServiceException, AtlasException {
- AtlasClient atlasClient = mock(AtlasClient.class);
- HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
- when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenReturn("hive_process_definition");
- AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
- when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
- when(atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName())).thenThrow(atlasServiceException);
-
- StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
- stormAtlasHook.registerDataModel(dataModelGenerator);
-
- verify(atlasClient).createType(contains("storm_topology"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/common/src/main/java/org/apache/atlas/ApplicationProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/ApplicationProperties.java b/common/src/main/java/org/apache/atlas/ApplicationProperties.java
index d74a30e..ca72ffd 100644
--- a/common/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/common/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -35,12 +35,22 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String APPLICATION_PROPERTIES = "atlas-application.properties";
- private static Configuration instance = null;
+ private static volatile Configuration instance = null;
private ApplicationProperties(URL url) throws ConfigurationException {
super(url);
}
+ public static void forceReload() {
+ if (instance != null) {
+ synchronized (ApplicationProperties.class) {
+ if (instance != null) {
+ instance = null;
+ }
+ }
+ }
+ }
+
public static Configuration get() throws AtlasException {
if (instance == null) {
synchronized (ApplicationProperties.class) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index 6071703..1ac4082 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -55,7 +55,7 @@
<appender-ref ref="FILE"/>
</logger>
- <logger name="AUDIT">
+ <logger name="AUDIT" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
</logger>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 2e41c5c..7e09a19 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -19,20 +19,21 @@
package org.apache.atlas.hook;
import com.google.inject.Guice;
-import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -44,25 +45,19 @@ import java.util.List;
public abstract class AtlasHook {
private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class);
- private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
-
- public static final String ATLAS_ENDPOINT = "atlas.rest.address";
-
- protected final AtlasClient atlasClient;
/**
* Hadoop Cluster name for this instance, typically used for namespace.
*/
protected static Configuration atlasProperties;
- @Inject
protected static NotificationInterface notifInterface;
static {
try {
atlasProperties = ApplicationProperties.get();
} catch (Exception e) {
- LOG.info("Attempting to send msg while shutdown in progress.", e);
+ LOG.info("Failed to load application properties", e);
}
Injector injector = Guice.createInjector(new NotificationModule());
@@ -71,18 +66,9 @@ public abstract class AtlasHook {
LOG.info("Created Atlas Hook");
}
- public AtlasHook() {
- this(new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT, DEFAULT_ATLAS_URL)));
- }
-
- public AtlasHook(AtlasClient atlasClient) {
- this.atlasClient = atlasClient;
- //TODO - take care of passing in - ugi, doAsUser for secure cluster
- }
-
protected abstract String getNumberOfRetriesPropertyKey();
- protected void notifyEntities(Collection<Referenceable> entities) {
+ protected void notifyEntities(String user, Collection<Referenceable> entities) {
JSONArray entitiesArray = new JSONArray();
for (Referenceable entity : entities) {
@@ -92,27 +78,26 @@ public abstract class AtlasHook {
}
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
- hookNotificationMessages.add(new HookNotification.EntityCreateRequest(entitiesArray));
+ hookNotificationMessages.add(new HookNotification.EntityCreateRequest(user, entitiesArray));
notifyEntities(hookNotificationMessages);
}
/**
- * Notify atlas
- * of the entity through message. The entity can be a
+ * Notify atlas of the entity through message. The entity can be a
* complex entity with reference to other entities.
* De-duping of entities is done on server side depending on the
* unique attribute on the entities.
*
- * @param entities entities
+ * @param messages hook notification messages
+ * @param maxRetries maximum number of retries while sending message to messaging system
*/
- protected void notifyEntities(List<HookNotification.HookNotificationMessage> entities) {
- final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
- final String message = entities.toString();
+ public static void notifyEntities(List<HookNotification.HookNotificationMessage> messages, int maxRetries) {
+ final String message = messages.toString();
int numRetries = 0;
while (true) {
try {
- notifInterface.send(NotificationInterface.NotificationType.HOOK, entities);
+ notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
return;
} catch(Exception e) {
numRetries++;
@@ -125,4 +110,50 @@ public abstract class AtlasHook {
}
}
}
+
+ /**
+ * Notify atlas of the entity through message. The entity can be a
+ * complex entity with reference to other entities.
+ * De-duping of entities is done on server side depending on the
+ * unique attribute on the entities.
+ *
+ * @param messages hook notification messages
+ */
+ protected void notifyEntities(List<HookNotification.HookNotificationMessage> messages) {
+ final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
+ notifyEntities(messages, maxRetries);
+ }
+
+ /**
+ * Returns the logged in user.
+ * @return
+ */
+ public static String getUser() {
+ return getUser(null, null);
+ }
+
+ /**
+ * Returns the user. Order of preference:
+ * 1. Given userName
+ * 2. ugi.getShortUserName()
+ * 3. UserGroupInformation.getCurrentUser().getShortUserName()
+ * 4. System.getProperty("user.name")
+ */
+
+ public static String getUser(String userName, UserGroupInformation ugi) {
+ if (StringUtils.isNotEmpty(userName)) {
+ return userName;
+ }
+
+ if (ugi != null && StringUtils.isNotEmpty(ugi.getShortUserName())) {
+ return ugi.getShortUserName();
+ }
+
+ try {
+ return UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ LOG.warn("Failed for UserGroupInformation.getCurrentUser()");
+ return System.getProperty("user.name");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 015af44..2fcbcd3 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -26,6 +26,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,14 +49,13 @@ public class NotificationHookConsumer implements Service {
@Inject
private NotificationInterface notificationInterface;
private ExecutorService executors;
- private AtlasClient atlasClient;
+ private String atlasEndpoint;
@Override
public void start() throws AtlasException {
Configuration applicationProperties = ApplicationProperties.get();
- String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
- atlasClient = new AtlasClient(atlasEndpoint);
+ atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
@@ -87,15 +87,8 @@ public class NotificationHookConsumer implements Service {
class HookConsumer implements Runnable {
private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
- private final AtlasClient client;
public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
- this(atlasClient, consumer);
- }
-
- public HookConsumer(AtlasClient client,
- NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
- this.client = client;
this.consumer = consumer;
}
@@ -118,6 +111,9 @@ public class NotificationHookConsumer implements Service {
try {
if (hasNext()) {
HookNotification.HookNotificationMessage message = consumer.next();
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(message.getUser());
+ AtlasClient atlasClient = getAtlasClient(ugi);
+
try {
switch (message.getType()) {
case ENTITY_CREATE:
@@ -154,9 +150,14 @@ public class NotificationHookConsumer implements Service {
}
}
+ protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+ return new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
+ }
+
boolean serverAvailable(Timer timer) {
try {
- while (!client.isServerReady()) {
+ AtlasClient atlasClient = getAtlasClient(UserGroupInformation.getCurrentUser());
+ while (!atlasClient.isServerReady()) {
try {
LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
SERVER_READY_WAIT_TIME_MS);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
index c20fdf1..e8ae177 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
@@ -22,6 +22,8 @@ import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.kafka.KafkaNotificationProvider;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
import org.apache.atlas.service.Service;
/**
@@ -37,5 +39,10 @@ public class NotificationModule extends AbstractModule {
Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
serviceBinder.addBinding().to(KafkaNotification.class);
serviceBinder.addBinding().to(NotificationHookConsumer.class);
+
+ //Add NotificationEntityChangeListener as EntityChangeListener
+ Multibinder<EntityChangeListener> entityChangeListenerBinder =
+ Multibinder.newSetBinder(binder(), EntityChangeListener.class);
+ entityChangeListenerBinder.addBinding().to(NotificationEntityChangeListener.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
index 31f5c2b..300cbb5 100644
--- a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
@@ -17,6 +17,7 @@
*/
package org.apache.atlas.notification.entity;
+import com.google.inject.Inject;
import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.NotificationInterface;
@@ -48,6 +49,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
* @param notificationInterface the notification framework interface
* @param typeSystem the Atlas type system
*/
+ @Inject
public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) {
this.notificationInterface = notificationInterface;
this.typeSystem = typeSystem;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
index a000161..4c7f6de 100644
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
@@ -25,6 +25,7 @@ import com.google.gson.JsonParseException;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.commons.lang.StringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
@@ -41,29 +42,24 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
@Override
public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
JsonDeserializationContext context) {
- if (json.isJsonArray()) {
- JSONArray jsonArray = context.deserialize(json, JSONArray.class);
- return new EntityCreateRequest(jsonArray);
- } else {
- HookNotificationType type =
- context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
- switch (type) {
- case ENTITY_CREATE:
- return context.deserialize(json, EntityCreateRequest.class);
-
- case ENTITY_FULL_UPDATE:
- return context.deserialize(json, EntityUpdateRequest.class);
-
- case ENTITY_PARTIAL_UPDATE:
- return context.deserialize(json, EntityPartialUpdateRequest.class);
-
- case TYPE_CREATE:
- case TYPE_UPDATE:
- return context.deserialize(json, TypeRequest.class);
-
- default:
- throw new IllegalStateException("Unhandled type " + type);
- }
+ HookNotificationType type =
+ context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
+ switch (type) {
+ case ENTITY_CREATE:
+ return context.deserialize(json, EntityCreateRequest.class);
+
+ case ENTITY_FULL_UPDATE:
+ return context.deserialize(json, EntityUpdateRequest.class);
+
+ case ENTITY_PARTIAL_UPDATE:
+ return context.deserialize(json, EntityPartialUpdateRequest.class);
+
+ case TYPE_CREATE:
+ case TYPE_UPDATE:
+ return context.deserialize(json, TypeRequest.class);
+
+ default:
+ throw new IllegalStateException("Unhandled type " + type);
}
}
@@ -78,18 +74,30 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
* Base type of hook message.
*/
public static class HookNotificationMessage {
+ public static final String UNKNOW_USER = "UNKNOWN";
protected HookNotificationType type;
+ protected String user;
private HookNotificationMessage() {
}
- public HookNotificationMessage(HookNotificationType type) {
+ public HookNotificationMessage(HookNotificationType type, String user) {
this.type = type;
+ this.user = user;
}
public HookNotificationType getType() {
return type;
}
+
+ public String getUser() {
+ if (StringUtils.isEmpty(user)) {
+ return UNKNOW_USER;
+ }
+ return user;
+ }
+
+
}
/**
@@ -101,8 +109,8 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
private TypeRequest() {
}
- public TypeRequest(HookNotificationType type, TypesDef typesDef) {
- super(type);
+ public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) {
+ super(type, user);
this.typesDef = typesDef;
}
@@ -120,21 +128,21 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
private EntityCreateRequest() {
}
- public EntityCreateRequest(Referenceable... entities) {
- this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities));
+ public EntityCreateRequest(String user, Referenceable... entities) {
+ this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user);
}
- public EntityCreateRequest(List<Referenceable> entities) {
- this(HookNotificationType.ENTITY_CREATE, entities);
+ public EntityCreateRequest(String user, List<Referenceable> entities) {
+ this(HookNotificationType.ENTITY_CREATE, entities, user);
}
- protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) {
- super(type);
+ protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) {
+ super(type, user);
this.entities = entities;
}
- public EntityCreateRequest(JSONArray jsonArray) {
- super(HookNotificationType.ENTITY_CREATE);
+ public EntityCreateRequest(String user, JSONArray jsonArray) {
+ super(HookNotificationType.ENTITY_CREATE, user);
entities = new ArrayList<>();
for (int index = 0; index < jsonArray.length(); index++) {
try {
@@ -154,12 +162,12 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
* Hook message for updating entities(full update).
*/
public static class EntityUpdateRequest extends EntityCreateRequest {
- public EntityUpdateRequest(Referenceable... entities) {
- this(Arrays.asList(entities));
+ public EntityUpdateRequest(String user, Referenceable... entities) {
+ this(user, Arrays.asList(entities));
}
- public EntityUpdateRequest(List<Referenceable> entities) {
- super(HookNotificationType.ENTITY_FULL_UPDATE, entities);
+ public EntityUpdateRequest(String user, List<Referenceable> entities) {
+ super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user);
}
}
@@ -175,9 +183,9 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
private EntityPartialUpdateRequest() {
}
- public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue,
+ public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue,
Referenceable entity) {
- super(HookNotificationType.ENTITY_PARTIAL_UPDATE);
+ super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user);
this.typeName = typeName;
this.attribute = attribute;
this.attributeValue = attributeValue;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index b3d4721..02255a7 100644
--- a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.atlas.notification;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
@@ -29,10 +30,15 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
- AtlasClient atlasClient = mock(AtlasClient.class);
+ final AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+ notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
+ @Override
+ protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+ return atlasClient;
+ }
+ };
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(true);
@@ -43,10 +49,15 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
- AtlasClient atlasClient = mock(AtlasClient.class);
+ final AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+ notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
+ @Override
+ protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+ return atlasClient;
+ }
+ };
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
@@ -57,10 +68,15 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
- AtlasClient atlasClient = mock(AtlasClient.class);
+ final AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+ notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
+ @Override
+ protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+ return atlasClient;
+ }
+ };
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
when(atlasClient.isServerReady()).thenReturn(false);
@@ -70,10 +86,15 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
- AtlasClient atlasClient = mock(AtlasClient.class);
+ final AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+ notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
+ @Override
+ protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
+ return atlasClient;
+ }
+ };
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
new Exception()));
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
index 1dedb5b..11b7a53 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
@@ -19,49 +19,74 @@ package org.apache.atlas.notification.hook;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.codehaus.jettison.json.JSONArray;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
public class HookNotificationTest {
-
- @Test
- public void testMessageBackwardCompatibility() throws Exception {
- JSONArray jsonArray = new JSONArray();
- Referenceable entity = new Referenceable("sometype");
- entity.set("name", "somename");
- String entityJson = InstanceSerialization.toJson(entity, true);
- jsonArray.put(entityJson);
-
- HookNotification.HookNotificationMessage notification = AbstractNotificationConsumer.GSON.fromJson(
- jsonArray.toString(), HookNotification.HookNotificationMessage.class);
- assertNotNull(notification);
- assertEquals(notification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
- HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) notification;
- assertEquals(createRequest.getEntities().size(), 1);
- assertEquals(createRequest.getEntities().get(0).getTypeName(), entity.getTypeName());
- }
-
@Test
public void testNewMessageSerDe() throws Exception {
Referenceable entity1 = new Referenceable("sometype");
entity1.set("attr", "value");
entity1.set("complex", new Referenceable("othertype"));
Referenceable entity2 = new Referenceable("newtype");
- HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(entity1, entity2);
+ String user = "user";
+ HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(user, entity1, entity2);
String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
notificationJson, HookNotification.HookNotificationMessage.class);
assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+ assertEquals(actualNotification.getUser(), user);
+
HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification;
assertEquals(createRequest.getEntities().size(), 2);
+
Referenceable actualEntity1 = createRequest.getEntities().get(0);
assertEquals(actualEntity1.getTypeName(), "sometype");
assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype");
assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype");
}
+
+ @Test
+ public void testBackwardCompatibility() throws Exception {
+ /**
+ Referenceable entity = new Referenceable("sometype");
+ entity.set("attr", "value");
+ String user = "user";
+ HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(null, entity);
+
+ String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
+ System.out.println(notificationJson);
+ **/
+
+ //Json without user and assert that the string can be deserialised
+ String notificationJson = "{\n"
+ + " \"entities\": [\n"
+ + " {\n"
+ + " \"jsonClass\": \"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference\",\n"
+ + " \"id\": {\n"
+ + " \"jsonClass\": \"org.apache.atlas.typesystem.json.InstanceSerialization$_Id\",\n"
+ + " \"id\": \"-1457685864305243000\",\n"
+ + " \"version\": 0,\n"
+ + " \"typeName\": \"sometype\"\n"
+ + " },\n"
+ + " \"typeName\": \"sometype\",\n"
+ + " \"values\": {\n"
+ + " \"attr\": \"value\"\n"
+ + " },\n"
+ + " \"traitNames\": [],\n"
+ + " \"traits\": {}\n"
+ + " }\n"
+ + " ],\n"
+ + " \"type\": \"ENTITY_CREATE\"\n"
+ + "}";
+
+ HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
+ notificationJson, HookNotification.HookNotificationMessage.class);
+ assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+ assertNull(actualNotification.user);
+ assertEquals(actualNotification.getUser(), HookNotification.HookNotificationMessage.UNKNOW_USER);
+ }
}
[2/3] incubator-atlas git commit: ATLAS-577 Integrate entity audit
with DefaultMetadataService (shwethags)
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5be00f5..c6d82aa 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1474,6 +1474,8 @@
<user.dir>${project.basedir}</user.dir>
<atlas.data>${project.build.directory}/data</atlas.data>
<log4j.configuration>atlas-log4j.xml</log4j.configuration>
+ <zookeeper.client.secure>false</zookeeper.client.secure>
+ <zookeeper.sasl.client>false</zookeeper.sasl.client>
</systemProperties>
<skipTests>${skipTests}</skipTests>
<forkMode>always</forkMode>
@@ -1483,9 +1485,6 @@
-Xmx1024m -XX:MaxPermSize=512m -Djava.net.preferIPv4Stack=true
</argLine>
<skip>${skipUTs}</skip>
- <excludes>
- <exclude>**/*Base*</exclude>
- </excludes>
</configuration>
<dependencies>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4569e55..aaef9e3 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
+ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags)
ATLAS-575 jetty-maven-plugin fails with ShutdownMonitorThread already started (shwethags)
ATLAS-408 UI : Add a close link (x) on the top right when Tag is added (darshankumar89 via shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index 6502bba..eca087a 100755
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -149,6 +149,7 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
index 7651bc7..4a02b0d 100755
--- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
+++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
@@ -18,6 +18,7 @@
package org.apache.atlas;
+import com.google.inject.Binder;
import com.google.inject.Singleton;
import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
@@ -27,21 +28,26 @@ import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.discovery.HiveLineageService;
import org.apache.atlas.discovery.LineageService;
-import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
+import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.audit.EntityAuditListener;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.repository.graph.TitanGraphProvider;
import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
import org.apache.atlas.repository.typestore.ITypeStore;
+import org.apache.atlas.service.Service;
import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.services.IBootstrapTypesRegistrar;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.services.ReservedTypesRegistrar;
import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeSystemProvider;
/**
* Guice module for Repository module.
@@ -51,9 +57,6 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
@Override
protected void configure() {
// special wiring for Titan Graph
-
-
-
ThrowingProviderBinder.create(binder()).bind(GraphProvider.class, TitanGraph.class).to(TitanGraphProvider.class)
.asEagerSingleton();
@@ -61,7 +64,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the MetadataRepositoryService interface to an implementation
bind(MetadataRepository.class).to(GraphBackedMetadataRepository.class).asEagerSingleton();
- bind(TypeSystem.class).in(Singleton.class);
+ bind(TypeSystem.class).toProvider(TypeSystemProvider.class).in(Singleton.class);
// bind the ITypeStore interface to an implementation
bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton();
@@ -80,9 +83,24 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
+ bindAuditRepository(binder());
+
+ //Add EntityAuditListener as EntityChangeListener
+ Multibinder<EntityChangeListener> entityChangeListenerBinder =
+ Multibinder.newSetBinder(binder(), EntityChangeListener.class);
+ entityChangeListenerBinder.addBinding().to(EntityAuditListener.class);
+
MethodInterceptor interceptor = new GraphTransactionInterceptor();
requestInjection(interceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), interceptor);
}
+ protected void bindAuditRepository(Binder binder) {
+ //Map EntityAuditRepository interface to hbase based implementation
+ binder.bind(EntityAuditRepository.class).to(HBaseBasedAuditRepository.class).asEagerSingleton();
+
+ //Add HBaseBasedAuditRepository to service so that connection is closed at shutdown
+ Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
+ serviceBinder.addBinding().to(HBaseBasedAuditRepository.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
new file mode 100644
index 0000000..0c5c551
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Listener on entity create/update/delete, tag add/delete. Adds the corresponding audit event to the audit repository.
+ */
+public class EntityAuditListener implements EntityChangeListener {
+ private EntityAuditRepository auditRepository;
+
+ @Inject
+ public EntityAuditListener(EntityAuditRepository auditRepository) {
+ this.auditRepository = auditRepository;
+ }
+
+ @Override
+ public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+ List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>();
+ long currentTime = System.currentTimeMillis();
+ for (ITypedReferenceableInstance entity : entities) {
+ EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime,
+ EntityAuditRepository.EntityAuditAction.ENTITY_CREATE,
+ "Created: " + InstanceSerialization.toJson(entity, true));
+ events.add(event);
+ }
+ auditRepository.putEvents(events);
+ }
+
+ private EntityAuditRepository.EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts,
+ EntityAuditRepository.EntityAuditAction action,
+ String details) {
+ return new EntityAuditRepository.EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(),
+ action, details);
+ }
+
+ @Override
+ public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+
+ }
+
+ @Override
+ public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
+ EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
+ EntityAuditRepository.EntityAuditAction.TAG_ADD,
+ "Added trait: " + InstanceSerialization.toJson(trait, true));
+ auditRepository.putEvents(event);
+ }
+
+ @Override
+ public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
+ EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
+ EntityAuditRepository.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
+ auditRepository.putEvents(event);
+ }
+
+ @Override
+ public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+ List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>();
+ long currentTime = System.currentTimeMillis();
+ for (ITypedReferenceableInstance entity : entities) {
+ EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime,
+ EntityAuditRepository.EntityAuditAction.ENTITY_DELETE, "Deleted entity");
+ events.add(event);
+ }
+ auditRepository.putEvents(events);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index a5b4a59..d41c4da 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -27,6 +27,10 @@ import java.util.List;
* Interface for repository for storing entity audit events
*/
public interface EntityAuditRepository {
+ enum EntityAuditAction {
+ ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE;
+ }
+
/**
* Structure of entity audit event
*/
@@ -34,13 +38,13 @@ public interface EntityAuditRepository {
String entityId;
Long timestamp;
String user;
- String action;
+ EntityAuditAction action;
String details;
public EntityAuditEvent() {
}
- public EntityAuditEvent(String entityId, long ts, String user, String action, String details) {
+ public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) {
this.entityId = entityId;
this.timestamp = ts;
this.user = user;
@@ -61,7 +65,7 @@ public interface EntityAuditRepository {
EntityAuditEvent otherEvent = (EntityAuditEvent) other;
return StringUtils.equals(entityId, otherEvent.entityId) &&
(timestamp.longValue() == otherEvent.timestamp.longValue()) &&
- StringUtils.equals(user, otherEvent.user) && StringUtils.equals(action, otherEvent.action) &&
+ StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) &&
StringUtils.equals(details, otherEvent.details);
}
@@ -77,6 +81,26 @@ public interface EntityAuditRepository {
.append(user).append(";Action=").append(action).append(";Details=").append(details);
return builder.toString();
}
+
+ public String getEntityId() {
+ return entityId;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public EntityAuditAction getAction() {
+ return action;
+ }
+
+ public String getDetails() {
+ return details;
+ }
}
/**
@@ -87,6 +111,13 @@ public interface EntityAuditRepository {
void putEvents(EntityAuditEvent... events) throws AtlasException;
/**
+ * Add events to the event repository
+ * @param events events to be added
+ * @throws AtlasException
+ */
+ void putEvents(List<EntityAuditEvent> events) throws AtlasException;
+
+ /**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
* @param entityId entity id
* @param ts starting timestamp for events
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 8b92792..ae6e988 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -80,16 +81,29 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
* @param events events to be added
* @throws AtlasException
*/
+ @Override
public void putEvents(EntityAuditRepository.EntityAuditEvent... events) throws AtlasException {
- LOG.info("Putting {} events", events.length);
+ putEvents(Arrays.asList(events));
+ }
+
+ @Override
+ /**
+ * Add events to the event repository
+ * @param events events to be added
+ * @throws AtlasException
+ */
+ public void putEvents(List<EntityAuditEvent> events) throws AtlasException {
+ LOG.info("Putting {} events", events.size());
Table table = null;
try {
table = connection.getTable(tableName);
- List<Put> puts = new ArrayList<>(events.length);
+ List<Put> puts = new ArrayList<>(events.size());
for (EntityAuditRepository.EntityAuditEvent event : events) {
LOG.debug("Adding entity audit event {}", event);
Put put = new Put(getKey(event.entityId, event.timestamp));
- addColumn(put, COLUMN_ACTION, event.action);
+ if (event.action != null) {
+ put.addColumn(COLUMN_FAMILY, COLUMN_ACTION, Bytes.toBytes((short)event.action.ordinal()));
+ }
addColumn(put, COLUMN_USER, event.user);
addColumn(put, COLUMN_DETAIL, event.details);
puts.add(put);
@@ -145,7 +159,8 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
String key = Bytes.toString(result.getRow());
EntityAuditRepository.EntityAuditEvent event = fromKey(key);
event.user = getResultString(result, COLUMN_USER);
- event.action = getResultString(result, COLUMN_ACTION);
+ event.action =
+ EntityAuditAction.values()[(Bytes.toShort(result.getValue(COLUMN_FAMILY, COLUMN_ACTION)))];
event.details = getResultString(result, COLUMN_DETAIL);
events.add(event);
}
@@ -189,7 +204,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
* @throws AtlasException
* @param atlasConf
*/
- public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
+ public static org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
Configuration subsetAtlasConf =
ApplicationProperties.getSubsetConfiguration(atlasConf, CONFIG_PREFIX);
org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
new file mode 100644
index 0000000..df75290
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.AtlasException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Entity audit repository where audit events are stored in-memory. Used only for integration tests
+ */
+public class InMemoryEntityAuditRepository implements EntityAuditRepository {
+ private TreeMap<String, EntityAuditEvent> auditEvents = new TreeMap<>();
+
+ @Override
+ public void putEvents(EntityAuditEvent... events) throws AtlasException {
+ putEvents(Arrays.asList(events));
+ }
+
+ @Override
+ public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException {
+ for (EntityAuditEvent event : events) {
+ auditEvents.put(event.entityId + (Long.MAX_VALUE - event.timestamp), event);
+ }
+ }
+
+ @Override
+ public List<EntityAuditEvent> listEvents(String entityId, Long ts, short maxResults)
+ throws AtlasException {
+ List<EntityAuditEvent> events = new ArrayList<>();
+ SortedMap<String, EntityAuditEvent> subMap = auditEvents.tailMap(entityId + (Long.MAX_VALUE - ts));
+ for (EntityAuditEvent event : subMap.values()) {
+ if (events.size() < maxResults && event.entityId.equals(entityId)) {
+ events.add(event);
+ }
+ }
+ return events;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 3ea5fde..7eccc58 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -59,8 +59,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
private final TitanGraph titanGraph;
- private TitanManagement management;
-
List<Class> MIXED_INDEX_EXCLUSIONS = new ArrayList() {{
add(Boolean.class);
add(BigDecimal.class);
@@ -68,57 +66,63 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}};
@Inject
- public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException {
+ public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException,
+ IndexException {
this.titanGraph = graphProvider.get();
/* Create the transaction for indexing.
*/
- management = titanGraph.getManagementSystem();
initialize();
}
/**
* Initializes the indices for the graph - create indices for Global Vertex Keys
*/
- private void initialize() {
- if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) {
- LOG.info("Global indexes already exist for graph");
- return;
- }
+ private void initialize() throws RepositoryException, IndexException {
+ TitanManagement management = titanGraph.getManagementSystem();
+ try {
+ if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) {
+ LOG.info("Global indexes already exist for graph");
+ return;
+ }
/* This is called only once, which is the first time Atlas types are made indexable .*/
- LOG.info("Indexes do not exist, Creating indexes for titanGraph.");
- management.buildIndex(Constants.VERTEX_INDEX, Vertex.class).buildMixedIndex(Constants.BACKING_INDEX);
- management.buildIndex(Constants.EDGE_INDEX, Edge.class).buildMixedIndex(Constants.BACKING_INDEX);
+ LOG.info("Indexes do not exist, Creating indexes for titanGraph.");
+ management.buildIndex(Constants.VERTEX_INDEX, Vertex.class).buildMixedIndex(Constants.BACKING_INDEX);
+ management.buildIndex(Constants.EDGE_INDEX, Edge.class).buildMixedIndex(Constants.BACKING_INDEX);
- // create a composite index for guid as its unique
- createCompositeAndMixedIndex(Constants.GUID_PROPERTY_KEY, String.class, true, Cardinality.SINGLE, true);
+ // create a composite index for guid as its unique
+ createCompositeAndMixedIndex(management, Constants.GUID_PROPERTY_KEY, String.class, true, Cardinality.SINGLE, true);
- // create a composite and mixed index for type since it can be combined with other keys
- createCompositeAndMixedIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
+ // create a composite and mixed index for type since it can be combined with other keys
+ createCompositeAndMixedIndex(management, Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE,
+ true);
- // create a composite and mixed index for type since it can be combined with other keys
- createCompositeAndMixedIndex(Constants.SUPER_TYPES_PROPERTY_KEY, String.class, false, Cardinality.SET, true);
+ // create a composite and mixed index for type since it can be combined with other keys
+ createCompositeAndMixedIndex(management, Constants.SUPER_TYPES_PROPERTY_KEY, String.class, false, Cardinality.SET,
+ true);
- // create a composite and mixed index for traitNames since it can be combined with other
- // keys. Traits must be a set and not a list.
- createCompositeAndMixedIndex(Constants.TRAIT_NAMES_PROPERTY_KEY, String.class, false, Cardinality.SET, true);
+ // create a composite and mixed index for traitNames since it can be combined with other
+ // keys. Traits must be a set and not a list.
+ createCompositeAndMixedIndex(management, Constants.TRAIT_NAMES_PROPERTY_KEY, String.class, false, Cardinality.SET,
+ true);
- // Index for full text search
- createFullTextIndex();
+ // Index for full text search
+ createFullTextIndex(management);
- //Indexes for graph backed type system store
- createTypeStoreIndexes();
+ //Indexes for graph backed type system store
+ createTypeStoreIndexes(management);
- management.commit();
- //Make sure we acquire another transaction after commit for subsequent indexing
- management = titanGraph.getManagementSystem();
-
- LOG.info("Index creation for global keys complete.");
+ commit(management);
+ LOG.info("Index creation for global keys complete.");
+ } catch (Throwable t) {
+ rollback(management);
+ throw new RepositoryException(t);
+ }
}
- private void createFullTextIndex() {
+ private void createFullTextIndex(TitanManagement management) {
PropertyKey fullText =
management.makePropertyKey(Constants.ENTITY_TEXT_PROPERTY_KEY).dataType(String.class).make();
@@ -128,12 +132,14 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
LOG.info("Created mixed index for {}", Constants.ENTITY_TEXT_PROPERTY_KEY);
}
- private void createTypeStoreIndexes() {
+ private void createTypeStoreIndexes(TitanManagement management) {
//Create unique index on typeName
- createCompositeAndMixedIndex(Constants.TYPENAME_PROPERTY_KEY, String.class, true, Cardinality.SINGLE, true);
+ createCompositeAndMixedIndex(management, Constants.TYPENAME_PROPERTY_KEY, String.class, true,
+ Cardinality.SINGLE, true);
//create index on vertex type
- createCompositeAndMixedIndex(Constants.VERTEX_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
+ createCompositeAndMixedIndex(management, Constants.VERTEX_TYPE_PROPERTY_KEY, String.class, false,
+ Cardinality.SINGLE, true);
}
/**
@@ -144,21 +150,22 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
*/
@Override
public void onAdd(Collection<? extends IDataType> dataTypes) throws AtlasException {
-
+ TitanManagement management = titanGraph.getManagementSystem();
for (IDataType dataType : dataTypes) {
LOG.info("Creating indexes for type name={}, definition={}", dataType.getName(), dataType.getClass());
try {
- addIndexForType(dataType);
+ addIndexForType(management, dataType);
LOG.info("Index creation for type {} complete", dataType.getName());
} catch (Throwable throwable) {
LOG.error("Error creating index for type {}", dataType, throwable);
//Rollback indexes if any failure
- rollback();
+ rollback(management);
throw new IndexCreationException("Error while creating index for type " + dataType, throwable);
}
}
+
//Commit indexes
- commit();
+ commit(management);
}
@Override
@@ -166,7 +173,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
onAdd(dataTypes);
}
- private void addIndexForType(IDataType dataType) {
+ private void addIndexForType(TitanManagement management, IDataType dataType) {
switch (dataType.getTypeCategory()) {
case PRIMITIVE:
case ENUM:
@@ -178,17 +185,17 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
case STRUCT:
StructType structType = (StructType) dataType;
- createIndexForFields(structType, structType.fieldMapping().fields);
+ createIndexForFields(management, structType, structType.fieldMapping().fields);
break;
case TRAIT:
TraitType traitType = (TraitType) dataType;
- createIndexForFields(traitType, traitType.fieldMapping().fields);
+ createIndexForFields(management, traitType, traitType.fieldMapping().fields);
break;
case CLASS:
ClassType classType = (ClassType) dataType;
- createIndexForFields(classType, classType.fieldMapping().fields);
+ createIndexForFields(management, classType, classType.fieldMapping().fields);
break;
default:
@@ -196,26 +203,26 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
}
- private void createIndexForFields(IDataType dataType, Map<String, AttributeInfo> fields) {
+ private void createIndexForFields(TitanManagement management, IDataType dataType, Map<String, AttributeInfo> fields) {
for (AttributeInfo field : fields.values()) {
if (field.isIndexable) {
- createIndexForAttribute(dataType.getName(), field);
+ createIndexForAttribute(management, dataType.getName(), field);
}
}
}
- private void createIndexForAttribute(String typeName, AttributeInfo field) {
+ private void createIndexForAttribute(TitanManagement management, String typeName, AttributeInfo field) {
final String propertyName = typeName + "." + field.name;
switch (field.dataType().getTypeCategory()) {
case PRIMITIVE:
Cardinality cardinality = getCardinality(field.multiplicity);
- createCompositeAndMixedIndex(propertyName, getPrimitiveClass(field.dataType()), field.isUnique,
+ createCompositeAndMixedIndex(management, propertyName, getPrimitiveClass(field.dataType()), field.isUnique,
cardinality, false);
break;
case ENUM:
cardinality = getCardinality(field.multiplicity);
- createCompositeAndMixedIndex(propertyName, String.class, field.isUnique, cardinality, false);
+ createCompositeAndMixedIndex(management, propertyName, String.class, field.isUnique, cardinality, false);
break;
case ARRAY:
@@ -226,7 +233,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
case STRUCT:
StructType structType = (StructType) field.dataType();
- createIndexForFields(structType, structType.fieldMapping().fields);
+ createIndexForFields(management, structType, structType.fieldMapping().fields);
break;
case TRAIT:
@@ -289,8 +296,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
- private PropertyKey createCompositeAndMixedIndex(String propertyName, Class propertyClass,
- boolean isUnique, Cardinality cardinality, boolean force) {
+ private PropertyKey createCompositeAndMixedIndex(TitanManagement management, String propertyName,
+ Class propertyClass,
+ boolean isUnique, Cardinality cardinality, boolean force) {
PropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
@@ -329,7 +337,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
Cardinality.SET);
}
- public void commit() throws IndexException {
+ public void commit(TitanManagement management) throws IndexException {
try {
management.commit();
} catch (Exception e) {
@@ -338,7 +346,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
}
- public void rollback() throws IndexException {
+ public void rollback(TitanManagement management) throws IndexException {
try {
management.rollback();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index e326f27..40728bc 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -22,13 +22,11 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Provider;
-
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
-import org.apache.atlas.repository.IndexCreationException;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.typestore.ITypeStore;
@@ -68,11 +66,8 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
-
-import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -86,32 +81,44 @@ public class DefaultMetadataService implements MetadataService {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class);
- private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
-
private final TypeSystem typeSystem;
private final MetadataRepository repository;
private final ITypeStore typeStore;
private IBootstrapTypesRegistrar typesRegistrar;
- private final Collection<Provider<TypesChangeListener>> typeChangeListeners;
+
+ private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>();
+ private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
@Inject
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
- final Collection<Provider<TypesChangeListener>> typeChangeListeners) throws AtlasException {
- this(repository, typeStore, typesRegistrar, typeChangeListeners, TypeSystem.getInstance());
+ final Collection<Provider<TypesChangeListener>> typeListenerProviders,
+ final Collection<Provider<EntityChangeListener>> entityListenerProviders)
+ throws AtlasException {
+ this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders,
+ TypeSystem.getInstance());
}
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
- final Collection<Provider<TypesChangeListener>> typeChangeListeners,
+ final Collection<Provider<TypesChangeListener>> typeListenerProviders,
+ final Collection<Provider<EntityChangeListener>> entityListenerProviders,
final TypeSystem typeSystem) throws AtlasException {
this.typeStore = typeStore;
this.typesRegistrar = typesRegistrar;
this.typeSystem = typeSystem;
this.repository = repository;
- this.typeChangeListeners = typeChangeListeners;
+ for (Provider<TypesChangeListener> provider : typeListenerProviders) {
+ typeChangeListeners.add(provider.get());
+ }
+
+ for (Provider<EntityChangeListener> provider : entityListenerProviders) {
+ entityChangeListeners.add(provider.get());
+ }
+
restoreTypeSystem();
+
typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
}
@@ -604,19 +611,8 @@ public class DefaultMetadataService implements MetadataService {
}
private void onTypesAdded(Map<String, IDataType> typesAdded) throws AtlasException {
- Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
- for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
- final TypesChangeListener listener = indexerProvider.get();
- try {
- listener.onAdd(typesAdded.values());
- } catch (IndexCreationException ice) {
- LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
- caughtExceptions.put(listener, ice);
- }
- }
-
- if (caughtExceptions.size() > 0) {
- throw new IndexCreationException("Index creation failed for types " + typesAdded.keySet() + ". Aborting");
+ for (TypesChangeListener listener : typeChangeListeners) {
+ listener.onAdd(typesAdded.values());
}
}
@@ -637,19 +633,8 @@ public class DefaultMetadataService implements MetadataService {
}
private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException {
- Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
- for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
- final TypesChangeListener listener = indexerProvider.get();
- try {
- listener.onChange(typesUpdated.values());
- } catch (IndexCreationException ice) {
- LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
- caughtExceptions.put(listener, ice);
- }
- }
-
- if (caughtExceptions.size() > 0) {
- throw new IndexCreationException("Index creation failed for types " + typesUpdated.keySet() + ". Aborting");
+ for (TypesChangeListener listener : typeChangeListeners) {
+ listener.onChange(typesUpdated.values());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
index 5b74dc8..b4a9cb2 100755
--- a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
@@ -19,15 +19,12 @@
package org.apache.atlas.discovery;
import com.google.common.collect.ImmutableSet;
-import com.thinkaurelius.titan.core.TitanGraph;
-
import org.apache.atlas.BaseHiveRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.TestUtils;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
@@ -46,7 +43,6 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -60,9 +56,6 @@ import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAt
public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest {
@Inject
- private GraphProvider<TitanGraph> graphProvider;
-
- @Inject
private MetadataRepository repositoryService;
@Inject
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
new file mode 100644
index 0000000..9c193f7
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+
+public class AuditRepositoryTestBase {
+ protected EntityAuditRepository eventRepository;
+
+ private String rand() {
+ return RandomStringUtils.randomAlphanumeric(10);
+ }
+
+ @Test
+ public void testAddEvents() throws Exception {
+ EntityAuditRepository.EntityAuditEvent event =
+ new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1",
+ EntityAuditRepository.EntityAuditAction.ENTITY_CREATE, "d1");
+
+ eventRepository.putEvents(event);
+
+ List<EntityAuditRepository.EntityAuditEvent> events =
+ eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10);
+ assertEquals(events.size(), 1);
+ assertEquals(events.get(0), event);
+ }
+
+ @Test
+ public void testListPagination() throws Exception {
+ String id1 = "id1" + rand();
+ String id2 = "id2" + rand();
+ String id3 = "id3" + rand();
+ long ts = System.currentTimeMillis();
+ List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3);
+ for (int i = 0; i < 3; i++) {
+ //Add events for both ids
+ EntityAuditRepository.EntityAuditEvent event =
+ new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i,
+ EntityAuditRepository.EntityAuditAction.ENTITY_UPDATE, "details" + i);
+ eventRepository.putEvents(event);
+ expectedEvents.add(event);
+ eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i,
+ EntityAuditRepository.EntityAuditAction.TAG_DELETE, "details" + i));
+ eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i,
+ EntityAuditRepository.EntityAuditAction.TAG_ADD, "details" + i));
+ }
+
+ //Use ts for which there is no event - ts + 2
+ List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2);
+ assertEquals(events.size(), 2);
+ assertEquals(events.get(0), expectedEvents.get(0));
+ assertEquals(events.get(1), expectedEvents.get(1));
+
+ //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
+ events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3);
+ assertEquals(events.size(), 1);
+ assertEquals(events.get(0), expectedEvents.get(2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
index ac52f29..677eb39 100644
--- a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
@@ -19,45 +19,24 @@
package org.apache.atlas.repository.audit;
import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class HBaseBasedAuditRepositoryTest {
- private HBaseTestingUtility testUtility;
- private HBaseBasedAuditRepository eventRepository;
- private LocalHBaseCluster hbaseCluster;
+public class HBaseBasedAuditRepositoryTest extends AuditRepositoryTestBase {
private TableName tableName;
@BeforeClass
public void setup() throws Exception {
- testUtility = HBaseTestingUtility.createLocalHTU();
- testUtility.startMiniZKCluster();
- testUtility.getConfiguration().set("zookeeper.session.timeout.ms", "1000");
- hbaseCluster = new LocalHBaseCluster(testUtility.getConfiguration());
- hbaseCluster.startup();
-
- eventRepository = new HBaseBasedAuditRepository() {
- @Override
- public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf)
- throws AtlasException {
- return testUtility.getConfiguration();
- }
- };
- eventRepository.start();
+ eventRepository = new HBaseBasedAuditRepository();
+ HBaseTestUtils.startCluster();
+ ((HBaseBasedAuditRepository)eventRepository).start();
Configuration properties = ApplicationProperties.get();
String tableNameStr = properties.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME,
@@ -67,63 +46,14 @@ public class HBaseBasedAuditRepositoryTest {
@AfterClass
public void teardown() throws Exception {
- eventRepository.stop();
- testUtility.getConnection().close();
- hbaseCluster.shutdown();
- testUtility.shutdownMiniZKCluster();
- }
-
- private String rand() {
- return RandomStringUtils.randomAlphanumeric(10);
+ ((HBaseBasedAuditRepository)eventRepository).stop();
+ HBaseTestUtils.stopCluster();
}
@Test
public void testTableCreated() throws Exception {
- Admin admin = testUtility.getConnection().getAdmin();
+ Connection connection = HBaseTestUtils.getConnection();
+ Admin admin = connection.getAdmin();
assertTrue(admin.tableExists(tableName));
}
-
- @Test
- public void testAddEvents() throws Exception {
- EntityAuditRepository.EntityAuditEvent event =
- new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1", "a1", "d1");
-
- eventRepository.putEvents(event);
-
- List<EntityAuditRepository.EntityAuditEvent> events =
- eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10);
- assertEquals(events.size(), 1);
- assertEquals(events.get(0), event);
- }
-
- @Test
- public void testListPagination() throws Exception {
- String id1 = "id1" + rand();
- String id2 = "id2" + rand();
- String id3 = "id3" + rand();
- long ts = System.nanoTime();
- List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3);
- for (int i = 0; i < 3; i++) {
- //Add events for both ids
- EntityAuditRepository.EntityAuditEvent event =
- new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i, "action" + i, "details" + i);
- eventRepository.putEvents(event);
- expectedEvents.add(event);
- eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i,
- "action" + i, "details" + i));
- eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i,
- "action" + i, "details" + i));
- }
-
- //Use ts for which there is no event - ts + 2
- List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2);
- assertEquals(events.size(), 2);
- assertEquals(events.get(0), expectedEvents.get(0));
- assertEquals(events.get(1), expectedEvents.get(1));
-
- //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
- events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3);
- assertEquals(events.size(), 1);
- assertEquals(events.get(0), expectedEvents.get(2));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java
new file mode 100644
index 0000000..0e43806
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.RequestContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.client.Connection;
+
+import java.io.IOException;
+
+public class HBaseTestUtils {
+ private static HBaseTestingUtility hbaseTestUtility;
+ private static LocalHBaseCluster hbaseCluster;
+
+ public static void startCluster() throws Exception {
+ Configuration hbaseConf =
+ HBaseBasedAuditRepository.getHBaseConfiguration(ApplicationProperties.get());
+ hbaseTestUtility = new HBaseTestingUtility(hbaseConf);
+ int zkPort = hbaseConf.getInt("hbase.zookeeper.property.clientPort", 19026);
+ hbaseTestUtility.startMiniZKCluster(1, zkPort);
+
+ hbaseCluster = new LocalHBaseCluster(hbaseTestUtility.getConfiguration());
+ hbaseCluster.startup();
+
+ RequestContext.createContext();
+ RequestContext.get().setUser("testuser");
+ }
+
+ public static void stopCluster() throws Exception {
+ hbaseTestUtility.getConnection().close();
+ hbaseCluster.shutdown();
+ hbaseTestUtility.shutdownMiniZKCluster();
+ }
+
+ public static Connection getConnection() throws IOException {
+ return hbaseTestUtility.getConnection();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java
new file mode 100644
index 0000000..3bdfcf9
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.testng.annotations.BeforeClass;
+
+public class InMemoryAuditRepositoryTest extends AuditRepositoryTestBase {
+ @BeforeClass
+ public void setup() {
+ eventRepository = new InMemoryEntityAuditRepository();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
index 0b01230..5ac0e43 100644
--- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
@@ -25,6 +25,9 @@ import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
+import org.apache.atlas.repository.audit.HBaseTestUtils;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.types.ClassType;
@@ -71,14 +74,19 @@ import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
@Guice(modules = RepositoryMetadataModule.class)
public class DefaultMetadataServiceTest {
@Inject
private MetadataService metadataService;
+
@Inject
private GraphProvider<TitanGraph> graphProvider;
+ @Inject
+ private EntityAuditRepository repository;
+
private Referenceable db = createDBEntity();
private Id dbId;
@@ -90,6 +98,11 @@ public class DefaultMetadataServiceTest {
@BeforeTest
public void setUp() throws Exception {
+ if (repository instanceof HBaseBasedAuditRepository) {
+ HBaseTestUtils.startCluster();
+ ((HBaseBasedAuditRepository) repository).start();
+ }
+
TypesDef typesDef = TestUtils.defineHiveTypes();
try {
metadataService.getTypeDefinition(TestUtils.TABLE_TYPE);
@@ -109,7 +122,7 @@ public class DefaultMetadataServiceTest {
}
@AfterTest
- public void shutdown() {
+ public void shutdown() throws Exception {
TypeSystem.getInstance().reset();
try {
//TODO - Fix failure during shutdown while using BDB
@@ -122,6 +135,11 @@ public class DefaultMetadataServiceTest {
} catch(Exception e) {
e.printStackTrace();
}
+
+ if (repository instanceof HBaseBasedAuditRepository) {
+ ((HBaseBasedAuditRepository) repository).stop();
+ HBaseTestUtils.stopCluster();
+ }
}
private String createInstance(Referenceable entity) throws Exception {
@@ -172,6 +190,7 @@ public class DefaultMetadataServiceTest {
entity.set("type", "VARCHAR(32)");
return entity;
}
+
@Test(expectedExceptions = TypeNotFoundException.class)
public void testCreateEntityWithUnknownDatatype() throws Exception {
Referenceable entity = new Referenceable("Unknown datatype");
@@ -179,7 +198,7 @@ public class DefaultMetadataServiceTest {
entity.set("name", dbName);
entity.set("description", "us db");
createInstance(entity);
- Assert.fail(TypeNotFoundException.class.getSimpleName() +" was expected but none thrown.");
+ Assert.fail(TypeNotFoundException.class.getSimpleName() + " was expected but none thrown.");
}
@Test
@@ -187,6 +206,7 @@ public class DefaultMetadataServiceTest {
//name is the unique attribute
Referenceable entity = createDBEntity();
String id = createInstance(entity);
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE);
//using the same name should succeed, but not create another entity
String newId = createInstance(entity);
@@ -199,6 +219,35 @@ public class DefaultMetadataServiceTest {
}
@Test
+ public void testEntityAudit() throws Exception {
+ //create entity
+ Referenceable entity = createDBEntity();
+ String id = createInstance(entity);
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE);
+
+ Struct tag = new Struct(TestUtils.PII);
+ metadataService.addTrait(id, InstanceSerialization.toJson(tag, true));
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_ADD);
+
+ metadataService.deleteTrait(id, TestUtils.PII);
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_DELETE);
+
+ metadataService.deleteEntities(Arrays.asList(id));
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_DELETE);
+ }
+
+ private void assertAuditEvents(String id, EntityAuditRepository.EntityAuditAction action) throws Exception {
+ List<EntityAuditRepository.EntityAuditEvent> events =
+ repository.listEvents(id, System.currentTimeMillis(), (short) 10);
+ for (EntityAuditRepository.EntityAuditEvent event : events) {
+ if (event.getAction() == action) {
+ return;
+ }
+ }
+ fail("Didn't find " + action + " in audit events");
+ }
+
+ @Test
public void testCreateEntityWithUniqueAttributeWithReference() throws Exception {
Referenceable db = createDBEntity();
String dbId = createInstance(db);
@@ -468,7 +517,7 @@ public class DefaultMetadataServiceTest {
tableDefinitionJson =
metadataService.getEntityDefinition(tableId._getId());
tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
- Assert.assertNull(((Struct)tableDefinition.get("serde1")).get("description"));
+ Assert.assertNull(((Struct) tableDefinition.get("serde1")).get("description"));
}
@@ -718,8 +767,6 @@ public class DefaultMetadataServiceTest {
@Test
public void testDeleteEntities() throws Exception {
-
-
// Create 2 table entities, each with 3 composite column entities
Referenceable dbEntity = createDBEntity();
String dbGuid = createInstance(dbEntity);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
index 84ec761..0685e19 100644
--- a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
@@ -20,18 +20,16 @@ package org.apache.atlas.services;
import com.google.inject.Provider;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.typestore.ITypeStore;
-import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.types.TypeSystem;
-import org.mockito.Matchers;
import org.testng.annotations.Test;
import java.util.ArrayList;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -45,7 +43,8 @@ public class DefaultMetadataServiceMockTest {
when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class),
mock(ITypeStore.class),
- typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), typeSystem);
+ typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
+ new ArrayList<Provider<EntityChangeListener>>(), typeSystem);
verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
typeSystem, defaultMetadataService);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/server-api/pom.xml
----------------------------------------------------------------------
diff --git a/server-api/pom.xml b/server-api/pom.xml
index 8b4753a..93a0358 100644
--- a/server-api/pom.xml
+++ b/server-api/pom.xml
@@ -47,7 +47,6 @@
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
</dependency>
-
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/server-api/src/main/java/org/apache/atlas/RequestContext.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
new file mode 100644
index 0000000..943e4b8
--- /dev/null
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RequestContext {
+ private static final Logger LOG = LoggerFactory.getLogger(RequestContext.class);
+
+ private static final ThreadLocal<RequestContext> CURRENT_CONTEXT = new ThreadLocal<>();
+
+ private String user;
+
+ private RequestContext() {
+ }
+
+ public static RequestContext get() {
+ return CURRENT_CONTEXT.get();
+ }
+
+ public static RequestContext createContext() {
+ RequestContext context = new RequestContext();
+ CURRENT_CONTEXT.set(context);
+ return context;
+ }
+
+ public static void clear() {
+ CURRENT_CONTEXT.remove();
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
index 9e4aa79..b41f3db 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
@@ -22,7 +22,6 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
-
import org.apache.atlas.AtlasException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.typesystem.TypesDef;
@@ -30,7 +29,6 @@ import org.apache.atlas.typesystem.exception.TypeExistsException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import javax.inject.Singleton;
-
import java.lang.reflect.Constructor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java
new file mode 100644
index 0000000..4e1cd36
--- /dev/null
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.typesystem.types;
+
+import com.google.inject.Provider;
+
+public class TypeSystemProvider implements Provider<TypeSystem> {
+ @Override
+ public TypeSystem get() {
+ return TypeSystem.getInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/typesystem/src/main/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties
index 239ac95..9a32e04 100644
--- a/typesystem/src/main/resources/atlas-application.properties
+++ b/typesystem/src/main/resources/atlas-application.properties
@@ -71,6 +71,12 @@ atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
+######### Entity Audit Configs #########
+atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
+atlas.audit.zookeeper.session.timeout.ms=1000
+atlas.audit.hbase.zookeeper.quorum=localhost
+atlas.audit.hbase.zookeeper.property.clientPort=19026
+
######### Security Properties #########
# SSL config
@@ -80,3 +86,5 @@ atlas.server.https.port=31443
######### Security Properties #########
hbase.security.authentication=simple
+
+atlas.hook.falcon.synchronous=true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 98be234..85c9471 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -342,10 +342,10 @@
</httpConnector>
<war>${project.build.directory}/atlas-webapp-${project.version}.war</war>
<daemon>true</daemon>
- <!--<webAppSourceDirectory>webapp/src/test/webapp</webAppSourceDirectory>-->
+ <webAppSourceDirectory>webapp/src/test/webapp</webAppSourceDirectory>
<webApp>
<contextPath>/</contextPath>
- <descriptor>webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <descriptor>${project.basedir}/src/test/webapp/WEB-INF/web.xml</descriptor>
<!-- ${project.build.directory}/atlas-webapp-${project.version} -->
<extraClasspath>${project.build.directory}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
index ae37314..01b1cd3 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
@@ -20,22 +20,32 @@ package org.apache.atlas.web.filters;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.security.SecurityProperties;
+import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.log4j.NDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Enumeration;
-import java.util.Iterator;
import java.util.Properties;
/**
@@ -47,6 +57,27 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasAuthenticationFilter.class);
static final String PREFIX = "atlas.http.authentication";
+ /**
+ * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
+ * before invoking the actual resource.
+ */
+ private HttpServlet optionsServlet;
+
+ /**
+ * Initialize the filter.
+ *
+ * @param filterConfig filter configuration.
+ * @throws ServletException thrown if the filter could not be initialized.
+ */
+ @Override
+ public void init(FilterConfig filterConfig) throws ServletException {
+ LOG.info("AtlasAuthenticationFilter initialization started");
+ super.init(filterConfig);
+
+ optionsServlet = new HttpServlet() {};
+ optionsServlet.init();
+ }
+
@Override
protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) throws ServletException {
Configuration configuration;
@@ -94,4 +125,50 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
return config;
}
+ @Override
+ public void doFilter(final ServletRequest request, final ServletResponse response,
+ final FilterChain filterChain) throws IOException, ServletException {
+
+ FilterChain filterChainWrapper = new FilterChain() {
+
+ @Override
+ public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
+ throws IOException, ServletException {
+ HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
+
+ if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
+ optionsServlet.service(request, response);
+ } else {
+ final String user = Servlets.getUserFromRequest(httpRequest);
+ if (StringUtils.isEmpty(user)) {
+ ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+ "Param user.name can't be empty");
+ } else {
+ try {
+ NDC.push(user + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
+ RequestContext requestContext = RequestContext.get();
+ requestContext.setUser(user);
+ LOG.info("Request from authenticated user: {}, URL={}", user,
+ Servlets.getRequestURI(httpRequest));
+
+ filterChain.doFilter(servletRequest, servletResponse);
+ } finally {
+ NDC.pop();
+ }
+ }
+ }
+ }
+ };
+
+ super.doFilter(request, response, filterChainWrapper);
+ }
+
+ @Override
+ public void destroy() {
+ if (optionsServlet != null) {
+ optionsServlet.destroy();
+ }
+
+ super.destroy();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index c735ecd..9d60e1a 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -20,6 +20,7 @@ package org.apache.atlas.web.filters;
import com.google.inject.Singleton;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.web.util.DateTimeHelper;
import org.apache.atlas.web.util.Servlets;
import org.slf4j.Logger;
@@ -60,15 +61,19 @@ public class AuditFilter implements Filter {
final String requestId = UUID.randomUUID().toString();
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
+ String user = getUserFromRequest(httpRequest);
try {
currentThread.setName(formatName(oldName, requestId));
- recordAudit(httpRequest, requestTimeISO9601);
+ RequestContext requestContext = RequestContext.createContext();
+ requestContext.setUser(user);
+ recordAudit(httpRequest, requestTimeISO9601, user);
filterChain.doFilter(request, response);
} finally {
// put the request id into the response so users can trace logs for this request
((HttpServletResponse) response).setHeader(AtlasClient.REQUEST_ID, requestId);
currentThread.setName(oldName);
+ RequestContext.clear();;
}
}
@@ -76,8 +81,7 @@ public class AuditFilter implements Filter {
return oldName + " - " + requestId;
}
- private void recordAudit(HttpServletRequest httpRequest, String whenISO9601) {
- final String who = getUserFromRequest(httpRequest);
+ private void recordAudit(HttpServletRequest httpRequest, String whenISO9601, String who) {
final String fromHost = httpRequest.getRemoteHost();
final String fromAddress = httpRequest.getRemoteAddr();
final String whatRequest = httpRequest.getMethod();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index c1f6a9b..dac89d7 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -21,6 +21,7 @@ package org.apache.atlas.web.listeners;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
+import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.google.inject.servlet.GuiceServletContextListener;
@@ -33,13 +34,9 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RepositoryMetadataModule;
-import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.service.Services;
-import org.apache.atlas.services.MetadataService;
-import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.commons.configuration.Configuration;
@@ -75,7 +72,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
LoginProcessor loginProcessor = new LoginProcessor();
loginProcessor.login();
- injector = Guice.createInjector(new RepositoryMetadataModule(), new NotificationModule(),
+ injector = Guice.createInjector(getRepositoryModule(), new NotificationModule(),
new JerseyServletModule() {
@Override
protected void configureServlets() {
@@ -99,6 +96,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
try {
Configuration configuration = ApplicationProperties.get();
if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) {
+ LOG.info("Enabling AuthenticationFilter");
filter("/*").through(AtlasAuthenticationFilter.class);
}
} catch (AtlasException e) {
@@ -113,13 +111,16 @@ public class GuiceServletConfig extends GuiceServletContextListener {
return injector;
}
+ protected Module getRepositoryModule() {
+ return new RepositoryMetadataModule();
+ }
+
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
super.contextInitialized(servletContextEvent);
installLogBridge();
- initMetadataService();
startServices();
}
@@ -148,7 +149,12 @@ public class GuiceServletConfig extends GuiceServletContextListener {
TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
final Graph graph = graphProvider.get().get();
- graph.shutdown();
+
+ try {
+ graph.shutdown();
+ } catch(Throwable t) {
+ LOG.warn("Error while shutting down graph", t);
+ }
//stop services
stopServices();
@@ -160,17 +166,4 @@ public class GuiceServletConfig extends GuiceServletContextListener {
Services services = injector.getInstance(Services.class);
services.stop();
}
-
- // initialize the metadata service
- private void initMetadataService() {
- MetadataService metadataService = injector.getInstance(MetadataService.class);
-
- // add a listener for entity changes
- NotificationInterface notificationInterface = injector.getInstance(NotificationInterface.class);
-
- NotificationEntityChangeListener listener =
- new NotificationEntityChangeListener(notificationInterface, TypeSystem.getInstance());
-
- metadataService.registerListener(listener);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
index 871d857..2e75a61 100755
--- a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
@@ -45,9 +45,14 @@ public class EmbeddedServer {
Connector connector = getConnector(port);
server.addConnector(connector);
+ WebAppContext application = getWebAppContext(path);
+ server.setHandler(application);
+ }
+
+ protected WebAppContext getWebAppContext(String path) {
WebAppContext application = new WebAppContext(path, "/");
application.setClassLoader(Thread.currentThread().getContextClassLoader());
- server.setHandler(application);
+ return application;
}
public static EmbeddedServer newServer(int port, String path, boolean secure) throws IOException {