You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/03/31 12:51:23 UTC
[2/3] incubator-atlas git commit: ATLAS-577 Integrate entity audit
with DefaultMetadataService (shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5be00f5..c6d82aa 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1474,6 +1474,8 @@
<user.dir>${project.basedir}</user.dir>
<atlas.data>${project.build.directory}/data</atlas.data>
<log4j.configuration>atlas-log4j.xml</log4j.configuration>
+ <zookeeper.client.secure>false</zookeeper.client.secure>
+ <zookeeper.sasl.client>false</zookeeper.sasl.client>
</systemProperties>
<skipTests>${skipTests}</skipTests>
<forkMode>always</forkMode>
@@ -1483,9 +1485,6 @@
-Xmx1024m -XX:MaxPermSize=512m -Djava.net.preferIPv4Stack=true
</argLine>
<skip>${skipUTs}</skip>
- <excludes>
- <exclude>**/*Base*</exclude>
- </excludes>
</configuration>
<dependencies>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4569e55..aaef9e3 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
+ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags)
ATLAS-575 jetty-maven-plugin fails with ShutdownMonitorThread already started (shwethags)
ATLAS-408 UI : Add a close link (x) on the top right when Tag is added (darshankumar89 via shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index 6502bba..eca087a 100755
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -149,6 +149,7 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
index 7651bc7..4a02b0d 100755
--- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
+++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
@@ -18,6 +18,7 @@
package org.apache.atlas;
+import com.google.inject.Binder;
import com.google.inject.Singleton;
import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
@@ -27,21 +28,26 @@ import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.discovery.HiveLineageService;
import org.apache.atlas.discovery.LineageService;
-import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
+import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.audit.EntityAuditListener;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.repository.graph.TitanGraphProvider;
import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
import org.apache.atlas.repository.typestore.ITypeStore;
+import org.apache.atlas.service.Service;
import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.services.IBootstrapTypesRegistrar;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.services.ReservedTypesRegistrar;
import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeSystemProvider;
/**
* Guice module for Repository module.
@@ -51,9 +57,6 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
@Override
protected void configure() {
// special wiring for Titan Graph
-
-
-
ThrowingProviderBinder.create(binder()).bind(GraphProvider.class, TitanGraph.class).to(TitanGraphProvider.class)
.asEagerSingleton();
@@ -61,7 +64,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the MetadataRepositoryService interface to an implementation
bind(MetadataRepository.class).to(GraphBackedMetadataRepository.class).asEagerSingleton();
- bind(TypeSystem.class).in(Singleton.class);
+ bind(TypeSystem.class).toProvider(TypeSystemProvider.class).in(Singleton.class);
// bind the ITypeStore interface to an implementation
bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton();
@@ -80,9 +83,24 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
+ bindAuditRepository(binder());
+
+ //Add EntityAuditListener as EntityChangeListener
+ Multibinder<EntityChangeListener> entityChangeListenerBinder =
+ Multibinder.newSetBinder(binder(), EntityChangeListener.class);
+ entityChangeListenerBinder.addBinding().to(EntityAuditListener.class);
+
MethodInterceptor interceptor = new GraphTransactionInterceptor();
requestInjection(interceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), interceptor);
}
+ protected void bindAuditRepository(Binder binder) {
+ //Map EntityAuditRepository interface to hbase based implementation
+ binder.bind(EntityAuditRepository.class).to(HBaseBasedAuditRepository.class).asEagerSingleton();
+
+ //Add HBaseBasedAuditRepository to service so that connection is closed at shutdown
+ Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
+ serviceBinder.addBinding().to(HBaseBasedAuditRepository.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
new file mode 100644
index 0000000..0c5c551
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Listener on entity create/update/delete, tag add/delete. Adds the corresponding audit event to the audit repository.
+ */
+public class EntityAuditListener implements EntityChangeListener {
+ private EntityAuditRepository auditRepository;
+
+ @Inject
+ public EntityAuditListener(EntityAuditRepository auditRepository) {
+ this.auditRepository = auditRepository;
+ }
+
+ @Override
+ public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+ List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>();
+ long currentTime = System.currentTimeMillis();
+ for (ITypedReferenceableInstance entity : entities) {
+ EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime,
+ EntityAuditRepository.EntityAuditAction.ENTITY_CREATE,
+ "Created: " + InstanceSerialization.toJson(entity, true));
+ events.add(event);
+ }
+ auditRepository.putEvents(events);
+ }
+
+ private EntityAuditRepository.EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts,
+ EntityAuditRepository.EntityAuditAction action,
+ String details) {
+ return new EntityAuditRepository.EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(),
+ action, details);
+ }
+
+ @Override
+ public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+
+ }
+
+ @Override
+ public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
+ EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
+ EntityAuditRepository.EntityAuditAction.TAG_ADD,
+ "Added trait: " + InstanceSerialization.toJson(trait, true));
+ auditRepository.putEvents(event);
+ }
+
+ @Override
+ public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
+ EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
+ EntityAuditRepository.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
+ auditRepository.putEvents(event);
+ }
+
+ @Override
+ public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+ List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>();
+ long currentTime = System.currentTimeMillis();
+ for (ITypedReferenceableInstance entity : entities) {
+ EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime,
+ EntityAuditRepository.EntityAuditAction.ENTITY_DELETE, "Deleted entity");
+ events.add(event);
+ }
+ auditRepository.putEvents(events);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index a5b4a59..d41c4da 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -27,6 +27,10 @@ import java.util.List;
* Interface for repository for storing entity audit events
*/
public interface EntityAuditRepository {
+ enum EntityAuditAction {
+ ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE;
+ }
+
/**
* Structure of entity audit event
*/
@@ -34,13 +38,13 @@ public interface EntityAuditRepository {
String entityId;
Long timestamp;
String user;
- String action;
+ EntityAuditAction action;
String details;
public EntityAuditEvent() {
}
- public EntityAuditEvent(String entityId, long ts, String user, String action, String details) {
+ public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) {
this.entityId = entityId;
this.timestamp = ts;
this.user = user;
@@ -61,7 +65,7 @@ public interface EntityAuditRepository {
EntityAuditEvent otherEvent = (EntityAuditEvent) other;
return StringUtils.equals(entityId, otherEvent.entityId) &&
(timestamp.longValue() == otherEvent.timestamp.longValue()) &&
- StringUtils.equals(user, otherEvent.user) && StringUtils.equals(action, otherEvent.action) &&
+ StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) &&
StringUtils.equals(details, otherEvent.details);
}
@@ -77,6 +81,26 @@ public interface EntityAuditRepository {
.append(user).append(";Action=").append(action).append(";Details=").append(details);
return builder.toString();
}
+
+ public String getEntityId() {
+ return entityId;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public EntityAuditAction getAction() {
+ return action;
+ }
+
+ public String getDetails() {
+ return details;
+ }
}
/**
@@ -87,6 +111,13 @@ public interface EntityAuditRepository {
void putEvents(EntityAuditEvent... events) throws AtlasException;
/**
+ * Add events to the event repository
+ * @param events events to be added
+ * @throws AtlasException
+ */
+ void putEvents(List<EntityAuditEvent> events) throws AtlasException;
+
+ /**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
* @param entityId entity id
* @param ts starting timestamp for events
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 8b92792..ae6e988 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -80,16 +81,29 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
* @param events events to be added
* @throws AtlasException
*/
+ @Override
public void putEvents(EntityAuditRepository.EntityAuditEvent... events) throws AtlasException {
- LOG.info("Putting {} events", events.length);
+ putEvents(Arrays.asList(events));
+ }
+
+ @Override
+ /**
+ * Add events to the event repository
+ * @param events events to be added
+ * @throws AtlasException
+ */
+ public void putEvents(List<EntityAuditEvent> events) throws AtlasException {
+ LOG.info("Putting {} events", events.size());
Table table = null;
try {
table = connection.getTable(tableName);
- List<Put> puts = new ArrayList<>(events.length);
+ List<Put> puts = new ArrayList<>(events.size());
for (EntityAuditRepository.EntityAuditEvent event : events) {
LOG.debug("Adding entity audit event {}", event);
Put put = new Put(getKey(event.entityId, event.timestamp));
- addColumn(put, COLUMN_ACTION, event.action);
+ if (event.action != null) {
+ put.addColumn(COLUMN_FAMILY, COLUMN_ACTION, Bytes.toBytes((short)event.action.ordinal()));
+ }
addColumn(put, COLUMN_USER, event.user);
addColumn(put, COLUMN_DETAIL, event.details);
puts.add(put);
@@ -145,7 +159,8 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
String key = Bytes.toString(result.getRow());
EntityAuditRepository.EntityAuditEvent event = fromKey(key);
event.user = getResultString(result, COLUMN_USER);
- event.action = getResultString(result, COLUMN_ACTION);
+ event.action =
+ EntityAuditAction.values()[(Bytes.toShort(result.getValue(COLUMN_FAMILY, COLUMN_ACTION)))];
event.details = getResultString(result, COLUMN_DETAIL);
events.add(event);
}
@@ -189,7 +204,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
* @throws AtlasException
* @param atlasConf
*/
- public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
+ public static org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
Configuration subsetAtlasConf =
ApplicationProperties.getSubsetConfiguration(atlasConf, CONFIG_PREFIX);
org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
new file mode 100644
index 0000000..df75290
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.AtlasException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Entity audit repository where audit events are stored in-memory. Used only for integration tests
+ */
+public class InMemoryEntityAuditRepository implements EntityAuditRepository {
+ private TreeMap<String, EntityAuditEvent> auditEvents = new TreeMap<>();
+
+ @Override
+ public void putEvents(EntityAuditEvent... events) throws AtlasException {
+ putEvents(Arrays.asList(events));
+ }
+
+ @Override
+ public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException {
+ for (EntityAuditEvent event : events) {
+ auditEvents.put(event.entityId + (Long.MAX_VALUE - event.timestamp), event);
+ }
+ }
+
+ @Override
+ public List<EntityAuditEvent> listEvents(String entityId, Long ts, short maxResults)
+ throws AtlasException {
+ List<EntityAuditEvent> events = new ArrayList<>();
+ SortedMap<String, EntityAuditEvent> subMap = auditEvents.tailMap(entityId + (Long.MAX_VALUE - ts));
+ for (EntityAuditEvent event : subMap.values()) {
+ if (events.size() < maxResults && event.entityId.equals(entityId)) {
+ events.add(event);
+ }
+ }
+ return events;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 3ea5fde..7eccc58 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -59,8 +59,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
private final TitanGraph titanGraph;
- private TitanManagement management;
-
List<Class> MIXED_INDEX_EXCLUSIONS = new ArrayList() {{
add(Boolean.class);
add(BigDecimal.class);
@@ -68,57 +66,63 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}};
@Inject
- public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException {
+ public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException,
+ IndexException {
this.titanGraph = graphProvider.get();
/* Create the transaction for indexing.
*/
- management = titanGraph.getManagementSystem();
initialize();
}
/**
* Initializes the indices for the graph - create indices for Global Vertex Keys
*/
- private void initialize() {
- if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) {
- LOG.info("Global indexes already exist for graph");
- return;
- }
+ private void initialize() throws RepositoryException, IndexException {
+ TitanManagement management = titanGraph.getManagementSystem();
+ try {
+ if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) {
+ LOG.info("Global indexes already exist for graph");
+ return;
+ }
/* This is called only once, which is the first time Atlas types are made indexable .*/
- LOG.info("Indexes do not exist, Creating indexes for titanGraph.");
- management.buildIndex(Constants.VERTEX_INDEX, Vertex.class).buildMixedIndex(Constants.BACKING_INDEX);
- management.buildIndex(Constants.EDGE_INDEX, Edge.class).buildMixedIndex(Constants.BACKING_INDEX);
+ LOG.info("Indexes do not exist, Creating indexes for titanGraph.");
+ management.buildIndex(Constants.VERTEX_INDEX, Vertex.class).buildMixedIndex(Constants.BACKING_INDEX);
+ management.buildIndex(Constants.EDGE_INDEX, Edge.class).buildMixedIndex(Constants.BACKING_INDEX);
- // create a composite index for guid as its unique
- createCompositeAndMixedIndex(Constants.GUID_PROPERTY_KEY, String.class, true, Cardinality.SINGLE, true);
+ // create a composite index for guid as its unique
+ createCompositeAndMixedIndex(management, Constants.GUID_PROPERTY_KEY, String.class, true, Cardinality.SINGLE, true);
- // create a composite and mixed index for type since it can be combined with other keys
- createCompositeAndMixedIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
+ // create a composite and mixed index for type since it can be combined with other keys
+ createCompositeAndMixedIndex(management, Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE,
+ true);
- // create a composite and mixed index for type since it can be combined with other keys
- createCompositeAndMixedIndex(Constants.SUPER_TYPES_PROPERTY_KEY, String.class, false, Cardinality.SET, true);
+ // create a composite and mixed index for type since it can be combined with other keys
+ createCompositeAndMixedIndex(management, Constants.SUPER_TYPES_PROPERTY_KEY, String.class, false, Cardinality.SET,
+ true);
- // create a composite and mixed index for traitNames since it can be combined with other
- // keys. Traits must be a set and not a list.
- createCompositeAndMixedIndex(Constants.TRAIT_NAMES_PROPERTY_KEY, String.class, false, Cardinality.SET, true);
+ // create a composite and mixed index for traitNames since it can be combined with other
+ // keys. Traits must be a set and not a list.
+ createCompositeAndMixedIndex(management, Constants.TRAIT_NAMES_PROPERTY_KEY, String.class, false, Cardinality.SET,
+ true);
- // Index for full text search
- createFullTextIndex();
+ // Index for full text search
+ createFullTextIndex(management);
- //Indexes for graph backed type system store
- createTypeStoreIndexes();
+ //Indexes for graph backed type system store
+ createTypeStoreIndexes(management);
- management.commit();
- //Make sure we acquire another transaction after commit for subsequent indexing
- management = titanGraph.getManagementSystem();
-
- LOG.info("Index creation for global keys complete.");
+ commit(management);
+ LOG.info("Index creation for global keys complete.");
+ } catch (Throwable t) {
+ rollback(management);
+ throw new RepositoryException(t);
+ }
}
- private void createFullTextIndex() {
+ private void createFullTextIndex(TitanManagement management) {
PropertyKey fullText =
management.makePropertyKey(Constants.ENTITY_TEXT_PROPERTY_KEY).dataType(String.class).make();
@@ -128,12 +132,14 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
LOG.info("Created mixed index for {}", Constants.ENTITY_TEXT_PROPERTY_KEY);
}
- private void createTypeStoreIndexes() {
+ private void createTypeStoreIndexes(TitanManagement management) {
//Create unique index on typeName
- createCompositeAndMixedIndex(Constants.TYPENAME_PROPERTY_KEY, String.class, true, Cardinality.SINGLE, true);
+ createCompositeAndMixedIndex(management, Constants.TYPENAME_PROPERTY_KEY, String.class, true,
+ Cardinality.SINGLE, true);
//create index on vertex type
- createCompositeAndMixedIndex(Constants.VERTEX_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
+ createCompositeAndMixedIndex(management, Constants.VERTEX_TYPE_PROPERTY_KEY, String.class, false,
+ Cardinality.SINGLE, true);
}
/**
@@ -144,21 +150,22 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
*/
@Override
public void onAdd(Collection<? extends IDataType> dataTypes) throws AtlasException {
-
+ TitanManagement management = titanGraph.getManagementSystem();
for (IDataType dataType : dataTypes) {
LOG.info("Creating indexes for type name={}, definition={}", dataType.getName(), dataType.getClass());
try {
- addIndexForType(dataType);
+ addIndexForType(management, dataType);
LOG.info("Index creation for type {} complete", dataType.getName());
} catch (Throwable throwable) {
LOG.error("Error creating index for type {}", dataType, throwable);
//Rollback indexes if any failure
- rollback();
+ rollback(management);
throw new IndexCreationException("Error while creating index for type " + dataType, throwable);
}
}
+
//Commit indexes
- commit();
+ commit(management);
}
@Override
@@ -166,7 +173,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
onAdd(dataTypes);
}
- private void addIndexForType(IDataType dataType) {
+ private void addIndexForType(TitanManagement management, IDataType dataType) {
switch (dataType.getTypeCategory()) {
case PRIMITIVE:
case ENUM:
@@ -178,17 +185,17 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
case STRUCT:
StructType structType = (StructType) dataType;
- createIndexForFields(structType, structType.fieldMapping().fields);
+ createIndexForFields(management, structType, structType.fieldMapping().fields);
break;
case TRAIT:
TraitType traitType = (TraitType) dataType;
- createIndexForFields(traitType, traitType.fieldMapping().fields);
+ createIndexForFields(management, traitType, traitType.fieldMapping().fields);
break;
case CLASS:
ClassType classType = (ClassType) dataType;
- createIndexForFields(classType, classType.fieldMapping().fields);
+ createIndexForFields(management, classType, classType.fieldMapping().fields);
break;
default:
@@ -196,26 +203,26 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
}
- private void createIndexForFields(IDataType dataType, Map<String, AttributeInfo> fields) {
+ private void createIndexForFields(TitanManagement management, IDataType dataType, Map<String, AttributeInfo> fields) {
for (AttributeInfo field : fields.values()) {
if (field.isIndexable) {
- createIndexForAttribute(dataType.getName(), field);
+ createIndexForAttribute(management, dataType.getName(), field);
}
}
}
- private void createIndexForAttribute(String typeName, AttributeInfo field) {
+ private void createIndexForAttribute(TitanManagement management, String typeName, AttributeInfo field) {
final String propertyName = typeName + "." + field.name;
switch (field.dataType().getTypeCategory()) {
case PRIMITIVE:
Cardinality cardinality = getCardinality(field.multiplicity);
- createCompositeAndMixedIndex(propertyName, getPrimitiveClass(field.dataType()), field.isUnique,
+ createCompositeAndMixedIndex(management, propertyName, getPrimitiveClass(field.dataType()), field.isUnique,
cardinality, false);
break;
case ENUM:
cardinality = getCardinality(field.multiplicity);
- createCompositeAndMixedIndex(propertyName, String.class, field.isUnique, cardinality, false);
+ createCompositeAndMixedIndex(management, propertyName, String.class, field.isUnique, cardinality, false);
break;
case ARRAY:
@@ -226,7 +233,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
case STRUCT:
StructType structType = (StructType) field.dataType();
- createIndexForFields(structType, structType.fieldMapping().fields);
+ createIndexForFields(management, structType, structType.fieldMapping().fields);
break;
case TRAIT:
@@ -289,8 +296,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
- private PropertyKey createCompositeAndMixedIndex(String propertyName, Class propertyClass,
- boolean isUnique, Cardinality cardinality, boolean force) {
+ private PropertyKey createCompositeAndMixedIndex(TitanManagement management, String propertyName,
+ Class propertyClass,
+ boolean isUnique, Cardinality cardinality, boolean force) {
PropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
@@ -329,7 +337,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
Cardinality.SET);
}
- public void commit() throws IndexException {
+ public void commit(TitanManagement management) throws IndexException {
try {
management.commit();
} catch (Exception e) {
@@ -338,7 +346,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
}
- public void rollback() throws IndexException {
+ public void rollback(TitanManagement management) throws IndexException {
try {
management.rollback();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index e326f27..40728bc 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -22,13 +22,11 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Provider;
-
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
-import org.apache.atlas.repository.IndexCreationException;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.typestore.ITypeStore;
@@ -68,11 +66,8 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
-
-import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -86,32 +81,44 @@ public class DefaultMetadataService implements MetadataService {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class);
- private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
-
private final TypeSystem typeSystem;
private final MetadataRepository repository;
private final ITypeStore typeStore;
private IBootstrapTypesRegistrar typesRegistrar;
- private final Collection<Provider<TypesChangeListener>> typeChangeListeners;
+
+ private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>();
+ private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
@Inject
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
- final Collection<Provider<TypesChangeListener>> typeChangeListeners) throws AtlasException {
- this(repository, typeStore, typesRegistrar, typeChangeListeners, TypeSystem.getInstance());
+ final Collection<Provider<TypesChangeListener>> typeListenerProviders,
+ final Collection<Provider<EntityChangeListener>> entityListenerProviders)
+ throws AtlasException {
+ this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders,
+ TypeSystem.getInstance());
}
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
- final Collection<Provider<TypesChangeListener>> typeChangeListeners,
+ final Collection<Provider<TypesChangeListener>> typeListenerProviders,
+ final Collection<Provider<EntityChangeListener>> entityListenerProviders,
final TypeSystem typeSystem) throws AtlasException {
this.typeStore = typeStore;
this.typesRegistrar = typesRegistrar;
this.typeSystem = typeSystem;
this.repository = repository;
- this.typeChangeListeners = typeChangeListeners;
+ for (Provider<TypesChangeListener> provider : typeListenerProviders) {
+ typeChangeListeners.add(provider.get());
+ }
+
+ for (Provider<EntityChangeListener> provider : entityListenerProviders) {
+ entityChangeListeners.add(provider.get());
+ }
+
restoreTypeSystem();
+
typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
}
@@ -604,19 +611,8 @@ public class DefaultMetadataService implements MetadataService {
}
private void onTypesAdded(Map<String, IDataType> typesAdded) throws AtlasException {
- Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
- for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
- final TypesChangeListener listener = indexerProvider.get();
- try {
- listener.onAdd(typesAdded.values());
- } catch (IndexCreationException ice) {
- LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
- caughtExceptions.put(listener, ice);
- }
- }
-
- if (caughtExceptions.size() > 0) {
- throw new IndexCreationException("Index creation failed for types " + typesAdded.keySet() + ". Aborting");
+ for (TypesChangeListener listener : typeChangeListeners) {
+ listener.onAdd(typesAdded.values());
}
}
@@ -637,19 +633,8 @@ public class DefaultMetadataService implements MetadataService {
}
private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException {
- Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
- for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
- final TypesChangeListener listener = indexerProvider.get();
- try {
- listener.onChange(typesUpdated.values());
- } catch (IndexCreationException ice) {
- LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
- caughtExceptions.put(listener, ice);
- }
- }
-
- if (caughtExceptions.size() > 0) {
- throw new IndexCreationException("Index creation failed for types " + typesUpdated.keySet() + ". Aborting");
+ for (TypesChangeListener listener : typeChangeListeners) {
+ listener.onChange(typesUpdated.values());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
index 5b74dc8..b4a9cb2 100755
--- a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java
@@ -19,15 +19,12 @@
package org.apache.atlas.discovery;
import com.google.common.collect.ImmutableSet;
-import com.thinkaurelius.titan.core.TitanGraph;
-
import org.apache.atlas.BaseHiveRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.TestUtils;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
@@ -46,7 +43,6 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -60,9 +56,6 @@ import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAt
public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest {
@Inject
- private GraphProvider<TitanGraph> graphProvider;
-
- @Inject
private MetadataRepository repositoryService;
@Inject
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
new file mode 100644
index 0000000..9c193f7
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+
+public class AuditRepositoryTestBase {
+ protected EntityAuditRepository eventRepository;
+
+ private String rand() {
+ return RandomStringUtils.randomAlphanumeric(10);
+ }
+
+ @Test
+ public void testAddEvents() throws Exception {
+ EntityAuditRepository.EntityAuditEvent event =
+ new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1",
+ EntityAuditRepository.EntityAuditAction.ENTITY_CREATE, "d1");
+
+ eventRepository.putEvents(event);
+
+ List<EntityAuditRepository.EntityAuditEvent> events =
+ eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10);
+ assertEquals(events.size(), 1);
+ assertEquals(events.get(0), event);
+ }
+
+ @Test
+ public void testListPagination() throws Exception {
+ String id1 = "id1" + rand();
+ String id2 = "id2" + rand();
+ String id3 = "id3" + rand();
+ long ts = System.currentTimeMillis();
+ List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3);
+ for (int i = 0; i < 3; i++) {
+ //Add events for both ids
+ EntityAuditRepository.EntityAuditEvent event =
+ new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i,
+ EntityAuditRepository.EntityAuditAction.ENTITY_UPDATE, "details" + i);
+ eventRepository.putEvents(event);
+ expectedEvents.add(event);
+ eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i,
+ EntityAuditRepository.EntityAuditAction.TAG_DELETE, "details" + i));
+ eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i,
+ EntityAuditRepository.EntityAuditAction.TAG_ADD, "details" + i));
+ }
+
+ //Use ts for which there is no event - ts + 2
+ List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2);
+ assertEquals(events.size(), 2);
+ assertEquals(events.get(0), expectedEvents.get(0));
+ assertEquals(events.get(1), expectedEvents.get(1));
+
+ //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
+ events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3);
+ assertEquals(events.size(), 1);
+ assertEquals(events.get(0), expectedEvents.get(2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
index ac52f29..677eb39 100644
--- a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryTest.java
@@ -19,45 +19,24 @@
package org.apache.atlas.repository.audit;
import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class HBaseBasedAuditRepositoryTest {
- private HBaseTestingUtility testUtility;
- private HBaseBasedAuditRepository eventRepository;
- private LocalHBaseCluster hbaseCluster;
+public class HBaseBasedAuditRepositoryTest extends AuditRepositoryTestBase {
private TableName tableName;
@BeforeClass
public void setup() throws Exception {
- testUtility = HBaseTestingUtility.createLocalHTU();
- testUtility.startMiniZKCluster();
- testUtility.getConfiguration().set("zookeeper.session.timeout.ms", "1000");
- hbaseCluster = new LocalHBaseCluster(testUtility.getConfiguration());
- hbaseCluster.startup();
-
- eventRepository = new HBaseBasedAuditRepository() {
- @Override
- public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf)
- throws AtlasException {
- return testUtility.getConfiguration();
- }
- };
- eventRepository.start();
+ eventRepository = new HBaseBasedAuditRepository();
+ HBaseTestUtils.startCluster();
+ ((HBaseBasedAuditRepository)eventRepository).start();
Configuration properties = ApplicationProperties.get();
String tableNameStr = properties.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME,
@@ -67,63 +46,14 @@ public class HBaseBasedAuditRepositoryTest {
@AfterClass
public void teardown() throws Exception {
- eventRepository.stop();
- testUtility.getConnection().close();
- hbaseCluster.shutdown();
- testUtility.shutdownMiniZKCluster();
- }
-
- private String rand() {
- return RandomStringUtils.randomAlphanumeric(10);
+ ((HBaseBasedAuditRepository)eventRepository).stop();
+ HBaseTestUtils.stopCluster();
}
@Test
public void testTableCreated() throws Exception {
- Admin admin = testUtility.getConnection().getAdmin();
+ Connection connection = HBaseTestUtils.getConnection();
+ Admin admin = connection.getAdmin();
assertTrue(admin.tableExists(tableName));
}
-
- @Test
- public void testAddEvents() throws Exception {
- EntityAuditRepository.EntityAuditEvent event =
- new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1", "a1", "d1");
-
- eventRepository.putEvents(event);
-
- List<EntityAuditRepository.EntityAuditEvent> events =
- eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10);
- assertEquals(events.size(), 1);
- assertEquals(events.get(0), event);
- }
-
- @Test
- public void testListPagination() throws Exception {
- String id1 = "id1" + rand();
- String id2 = "id2" + rand();
- String id3 = "id3" + rand();
- long ts = System.nanoTime();
- List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3);
- for (int i = 0; i < 3; i++) {
- //Add events for both ids
- EntityAuditRepository.EntityAuditEvent event =
- new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i, "action" + i, "details" + i);
- eventRepository.putEvents(event);
- expectedEvents.add(event);
- eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i,
- "action" + i, "details" + i));
- eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i,
- "action" + i, "details" + i));
- }
-
- //Use ts for which there is no event - ts + 2
- List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2);
- assertEquals(events.size(), 2);
- assertEquals(events.get(0), expectedEvents.get(0));
- assertEquals(events.get(1), expectedEvents.get(1));
-
- //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
- events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3);
- assertEquals(events.size(), 1);
- assertEquals(events.get(0), expectedEvents.get(2));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java
new file mode 100644
index 0000000..0e43806
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseTestUtils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.RequestContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.client.Connection;
+
+import java.io.IOException;
+
+public class HBaseTestUtils {
+ private static HBaseTestingUtility hbaseTestUtility;
+ private static LocalHBaseCluster hbaseCluster;
+
+ public static void startCluster() throws Exception {
+ Configuration hbaseConf =
+ HBaseBasedAuditRepository.getHBaseConfiguration(ApplicationProperties.get());
+ hbaseTestUtility = new HBaseTestingUtility(hbaseConf);
+ int zkPort = hbaseConf.getInt("hbase.zookeeper.property.clientPort", 19026);
+ hbaseTestUtility.startMiniZKCluster(1, zkPort);
+
+ hbaseCluster = new LocalHBaseCluster(hbaseTestUtility.getConfiguration());
+ hbaseCluster.startup();
+
+ RequestContext.createContext();
+ RequestContext.get().setUser("testuser");
+ }
+
+ public static void stopCluster() throws Exception {
+ hbaseTestUtility.getConnection().close();
+ hbaseCluster.shutdown();
+ hbaseTestUtility.shutdownMiniZKCluster();
+ }
+
+ public static Connection getConnection() throws IOException {
+ return hbaseTestUtility.getConnection();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java
new file mode 100644
index 0000000..3bdfcf9
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/InMemoryAuditRepositoryTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.audit;
+
+import org.testng.annotations.BeforeClass;
+
+public class InMemoryAuditRepositoryTest extends AuditRepositoryTestBase {
+ @BeforeClass
+ public void setup() {
+ eventRepository = new InMemoryEntityAuditRepository();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
index 0b01230..5ac0e43 100644
--- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
@@ -25,6 +25,9 @@ import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
+import org.apache.atlas.repository.audit.HBaseTestUtils;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.types.ClassType;
@@ -71,14 +74,19 @@ import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
@Guice(modules = RepositoryMetadataModule.class)
public class DefaultMetadataServiceTest {
@Inject
private MetadataService metadataService;
+
@Inject
private GraphProvider<TitanGraph> graphProvider;
+ @Inject
+ private EntityAuditRepository repository;
+
private Referenceable db = createDBEntity();
private Id dbId;
@@ -90,6 +98,11 @@ public class DefaultMetadataServiceTest {
@BeforeTest
public void setUp() throws Exception {
+ if (repository instanceof HBaseBasedAuditRepository) {
+ HBaseTestUtils.startCluster();
+ ((HBaseBasedAuditRepository) repository).start();
+ }
+
TypesDef typesDef = TestUtils.defineHiveTypes();
try {
metadataService.getTypeDefinition(TestUtils.TABLE_TYPE);
@@ -109,7 +122,7 @@ public class DefaultMetadataServiceTest {
}
@AfterTest
- public void shutdown() {
+ public void shutdown() throws Exception {
TypeSystem.getInstance().reset();
try {
//TODO - Fix failure during shutdown while using BDB
@@ -122,6 +135,11 @@ public class DefaultMetadataServiceTest {
} catch(Exception e) {
e.printStackTrace();
}
+
+ if (repository instanceof HBaseBasedAuditRepository) {
+ ((HBaseBasedAuditRepository) repository).stop();
+ HBaseTestUtils.stopCluster();
+ }
}
private String createInstance(Referenceable entity) throws Exception {
@@ -172,6 +190,7 @@ public class DefaultMetadataServiceTest {
entity.set("type", "VARCHAR(32)");
return entity;
}
+
@Test(expectedExceptions = TypeNotFoundException.class)
public void testCreateEntityWithUnknownDatatype() throws Exception {
Referenceable entity = new Referenceable("Unknown datatype");
@@ -179,7 +198,7 @@ public class DefaultMetadataServiceTest {
entity.set("name", dbName);
entity.set("description", "us db");
createInstance(entity);
- Assert.fail(TypeNotFoundException.class.getSimpleName() +" was expected but none thrown.");
+ Assert.fail(TypeNotFoundException.class.getSimpleName() + " was expected but none thrown.");
}
@Test
@@ -187,6 +206,7 @@ public class DefaultMetadataServiceTest {
//name is the unique attribute
Referenceable entity = createDBEntity();
String id = createInstance(entity);
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE);
//using the same name should succeed, but not create another entity
String newId = createInstance(entity);
@@ -199,6 +219,35 @@ public class DefaultMetadataServiceTest {
}
@Test
+ public void testEntityAudit() throws Exception {
+ //create entity
+ Referenceable entity = createDBEntity();
+ String id = createInstance(entity);
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE);
+
+ Struct tag = new Struct(TestUtils.PII);
+ metadataService.addTrait(id, InstanceSerialization.toJson(tag, true));
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_ADD);
+
+ metadataService.deleteTrait(id, TestUtils.PII);
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_DELETE);
+
+ metadataService.deleteEntities(Arrays.asList(id));
+ assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_DELETE);
+ }
+
+ private void assertAuditEvents(String id, EntityAuditRepository.EntityAuditAction action) throws Exception {
+ List<EntityAuditRepository.EntityAuditEvent> events =
+ repository.listEvents(id, System.currentTimeMillis(), (short) 10);
+ for (EntityAuditRepository.EntityAuditEvent event : events) {
+ if (event.getAction() == action) {
+ return;
+ }
+ }
+ fail("Didn't find " + action + " in audit events");
+ }
+
+ @Test
public void testCreateEntityWithUniqueAttributeWithReference() throws Exception {
Referenceable db = createDBEntity();
String dbId = createInstance(db);
@@ -468,7 +517,7 @@ public class DefaultMetadataServiceTest {
tableDefinitionJson =
metadataService.getEntityDefinition(tableId._getId());
tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
- Assert.assertNull(((Struct)tableDefinition.get("serde1")).get("description"));
+ Assert.assertNull(((Struct) tableDefinition.get("serde1")).get("description"));
}
@@ -718,8 +767,6 @@ public class DefaultMetadataServiceTest {
@Test
public void testDeleteEntities() throws Exception {
-
-
// Create 2 table entities, each with 3 composite column entities
Referenceable dbEntity = createDBEntity();
String dbGuid = createInstance(dbEntity);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
index 84ec761..0685e19 100644
--- a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java
@@ -20,18 +20,16 @@ package org.apache.atlas.services;
import com.google.inject.Provider;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.typestore.ITypeStore;
-import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.types.TypeSystem;
-import org.mockito.Matchers;
import org.testng.annotations.Test;
import java.util.ArrayList;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -45,7 +43,8 @@ public class DefaultMetadataServiceMockTest {
when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class),
mock(ITypeStore.class),
- typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), typeSystem);
+ typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
+ new ArrayList<Provider<EntityChangeListener>>(), typeSystem);
verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
typeSystem, defaultMetadataService);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/server-api/pom.xml
----------------------------------------------------------------------
diff --git a/server-api/pom.xml b/server-api/pom.xml
index 8b4753a..93a0358 100644
--- a/server-api/pom.xml
+++ b/server-api/pom.xml
@@ -47,7 +47,6 @@
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
</dependency>
-
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/server-api/src/main/java/org/apache/atlas/RequestContext.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
new file mode 100644
index 0000000..943e4b8
--- /dev/null
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RequestContext {
+ private static final Logger LOG = LoggerFactory.getLogger(RequestContext.class);
+
+ private static final ThreadLocal<RequestContext> CURRENT_CONTEXT = new ThreadLocal<>();
+
+ private String user;
+
+ private RequestContext() {
+ }
+
+ public static RequestContext get() {
+ return CURRENT_CONTEXT.get();
+ }
+
+ public static RequestContext createContext() {
+ RequestContext context = new RequestContext();
+ CURRENT_CONTEXT.set(context);
+ return context;
+ }
+
+ public static void clear() {
+ CURRENT_CONTEXT.remove();
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
index 9e4aa79..b41f3db 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java
@@ -22,7 +22,6 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
-
import org.apache.atlas.AtlasException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.typesystem.TypesDef;
@@ -30,7 +29,6 @@ import org.apache.atlas.typesystem.exception.TypeExistsException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import javax.inject.Singleton;
-
import java.lang.reflect.Constructor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java
new file mode 100644
index 0000000..4e1cd36
--- /dev/null
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystemProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.typesystem.types;
+
+import com.google.inject.Provider;
+
+public class TypeSystemProvider implements Provider<TypeSystem> {
+ @Override
+ public TypeSystem get() {
+ return TypeSystem.getInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/typesystem/src/main/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties
index 239ac95..9a32e04 100644
--- a/typesystem/src/main/resources/atlas-application.properties
+++ b/typesystem/src/main/resources/atlas-application.properties
@@ -71,6 +71,12 @@ atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
+######### Entity Audit Configs #########
+atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
+atlas.audit.zookeeper.session.timeout.ms=1000
+atlas.audit.hbase.zookeeper.quorum=localhost
+atlas.audit.hbase.zookeeper.property.clientPort=19026
+
######### Security Properties #########
# SSL config
@@ -80,3 +86,5 @@ atlas.server.https.port=31443
######### Security Properties #########
hbase.security.authentication=simple
+
+atlas.hook.falcon.synchronous=true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 98be234..85c9471 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -342,10 +342,10 @@
</httpConnector>
<war>${project.build.directory}/atlas-webapp-${project.version}.war</war>
<daemon>true</daemon>
- <!--<webAppSourceDirectory>webapp/src/test/webapp</webAppSourceDirectory>-->
+ <webAppSourceDirectory>webapp/src/test/webapp</webAppSourceDirectory>
<webApp>
<contextPath>/</contextPath>
- <descriptor>webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ <descriptor>${project.basedir}/src/test/webapp/WEB-INF/web.xml</descriptor>
<!-- ${project.build.directory}/atlas-webapp-${project.version} -->
<extraClasspath>${project.build.directory}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
index ae37314..01b1cd3 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java
@@ -20,22 +20,32 @@ package org.apache.atlas.web.filters;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.security.SecurityProperties;
+import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.log4j.NDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Enumeration;
-import java.util.Iterator;
import java.util.Properties;
/**
@@ -47,6 +57,27 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasAuthenticationFilter.class);
static final String PREFIX = "atlas.http.authentication";
+ /**
+ * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
+ * before invoking the actual resource.
+ */
+ private HttpServlet optionsServlet;
+
+ /**
+ * Initialize the filter.
+ *
+ * @param filterConfig filter configuration.
+ * @throws ServletException thrown if the filter could not be initialized.
+ */
+ @Override
+ public void init(FilterConfig filterConfig) throws ServletException {
+ LOG.info("AtlasAuthenticationFilter initialization started");
+ super.init(filterConfig);
+
+ optionsServlet = new HttpServlet() {};
+ optionsServlet.init();
+ }
+
@Override
protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) throws ServletException {
Configuration configuration;
@@ -94,4 +125,50 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
return config;
}
+ @Override
+ public void doFilter(final ServletRequest request, final ServletResponse response,
+ final FilterChain filterChain) throws IOException, ServletException {
+
+ FilterChain filterChainWrapper = new FilterChain() {
+
+ @Override
+ public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
+ throws IOException, ServletException {
+ HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
+
+ if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
+ optionsServlet.service(request, response);
+ } else {
+ final String user = Servlets.getUserFromRequest(httpRequest);
+ if (StringUtils.isEmpty(user)) {
+ ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+ "Param user.name can't be empty");
+ } else {
+ try {
+ NDC.push(user + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
+ RequestContext requestContext = RequestContext.get();
+ requestContext.setUser(user);
+ LOG.info("Request from authenticated user: {}, URL={}", user,
+ Servlets.getRequestURI(httpRequest));
+
+ filterChain.doFilter(servletRequest, servletResponse);
+ } finally {
+ NDC.pop();
+ }
+ }
+ }
+ }
+ };
+
+ super.doFilter(request, response, filterChainWrapper);
+ }
+
+ @Override
+ public void destroy() {
+ if (optionsServlet != null) {
+ optionsServlet.destroy();
+ }
+
+ super.destroy();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index c735ecd..9d60e1a 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -20,6 +20,7 @@ package org.apache.atlas.web.filters;
import com.google.inject.Singleton;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.web.util.DateTimeHelper;
import org.apache.atlas.web.util.Servlets;
import org.slf4j.Logger;
@@ -60,15 +61,19 @@ public class AuditFilter implements Filter {
final String requestId = UUID.randomUUID().toString();
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
+ String user = getUserFromRequest(httpRequest);
try {
currentThread.setName(formatName(oldName, requestId));
- recordAudit(httpRequest, requestTimeISO9601);
+ RequestContext requestContext = RequestContext.createContext();
+ requestContext.setUser(user);
+ recordAudit(httpRequest, requestTimeISO9601, user);
filterChain.doFilter(request, response);
} finally {
// put the request id into the response so users can trace logs for this request
((HttpServletResponse) response).setHeader(AtlasClient.REQUEST_ID, requestId);
currentThread.setName(oldName);
+ RequestContext.clear();;
}
}
@@ -76,8 +81,7 @@ public class AuditFilter implements Filter {
return oldName + " - " + requestId;
}
- private void recordAudit(HttpServletRequest httpRequest, String whenISO9601) {
- final String who = getUserFromRequest(httpRequest);
+ private void recordAudit(HttpServletRequest httpRequest, String whenISO9601, String who) {
final String fromHost = httpRequest.getRemoteHost();
final String fromAddress = httpRequest.getRemoteAddr();
final String whatRequest = httpRequest.getMethod();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index c1f6a9b..dac89d7 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -21,6 +21,7 @@ package org.apache.atlas.web.listeners;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
+import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.google.inject.servlet.GuiceServletContextListener;
@@ -33,13 +34,9 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RepositoryMetadataModule;
-import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.service.Services;
-import org.apache.atlas.services.MetadataService;
-import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.commons.configuration.Configuration;
@@ -75,7 +72,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
LoginProcessor loginProcessor = new LoginProcessor();
loginProcessor.login();
- injector = Guice.createInjector(new RepositoryMetadataModule(), new NotificationModule(),
+ injector = Guice.createInjector(getRepositoryModule(), new NotificationModule(),
new JerseyServletModule() {
@Override
protected void configureServlets() {
@@ -99,6 +96,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
try {
Configuration configuration = ApplicationProperties.get();
if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) {
+ LOG.info("Enabling AuthenticationFilter");
filter("/*").through(AtlasAuthenticationFilter.class);
}
} catch (AtlasException e) {
@@ -113,13 +111,16 @@ public class GuiceServletConfig extends GuiceServletContextListener {
return injector;
}
+ protected Module getRepositoryModule() {
+ return new RepositoryMetadataModule();
+ }
+
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
super.contextInitialized(servletContextEvent);
installLogBridge();
- initMetadataService();
startServices();
}
@@ -148,7 +149,12 @@ public class GuiceServletConfig extends GuiceServletContextListener {
TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
final Graph graph = graphProvider.get().get();
- graph.shutdown();
+
+ try {
+ graph.shutdown();
+ } catch(Throwable t) {
+ LOG.warn("Error while shutting down graph", t);
+ }
//stop services
stopServices();
@@ -160,17 +166,4 @@ public class GuiceServletConfig extends GuiceServletContextListener {
Services services = injector.getInstance(Services.class);
services.stop();
}
-
- // initialize the metadata service
- private void initMetadataService() {
- MetadataService metadataService = injector.getInstance(MetadataService.class);
-
- // add a listener for entity changes
- NotificationInterface notificationInterface = injector.getInstance(NotificationInterface.class);
-
- NotificationEntityChangeListener listener =
- new NotificationEntityChangeListener(notificationInterface, TypeSystem.getInstance());
-
- metadataService.registerListener(listener);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
index 871d857..2e75a61 100755
--- a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
@@ -45,9 +45,14 @@ public class EmbeddedServer {
Connector connector = getConnector(port);
server.addConnector(connector);
+ WebAppContext application = getWebAppContext(path);
+ server.setHandler(application);
+ }
+
+ protected WebAppContext getWebAppContext(String path) {
WebAppContext application = new WebAppContext(path, "/");
application.setClassLoader(Thread.currentThread().getContextClassLoader());
- server.setHandler(application);
+ return application;
}
public static EmbeddedServer newServer(int port, String path, boolean secure) throws IOException {