You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2016/11/04 16:54:49 UTC
incubator-atlas git commit: ATLAS-1266: fixed typedef APIs to update
type-registry only on successful graph commit
Repository: incubator-atlas
Updated Branches:
refs/heads/master 6a24cad18 -> f3bbdc151
ATLAS-1266: fixed typedef APIs to update type-registry only on successful graph commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f3bbdc15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f3bbdc15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f3bbdc15
Branch: refs/heads/master
Commit: f3bbdc151d3aed6c8843f763c06d12c81033de7b
Parents: 6a24cad
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Wed Nov 2 17:13:00 2016 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Nov 3 13:40:17 2016 -0700
----------------------------------------------------------------------
.../apache/atlas/type/AtlasTypeRegistry.java | 152 +++++++++++----
.../atlas/GraphTransactionInterceptor.java | 62 ++++--
.../apache/atlas/RepositoryMetadataModule.java | 2 +
.../graph/GraphBackedSearchIndexer.java | 2 +-
.../store/graph/AtlasTypeDefGraphStore.java | 191 +++++++------------
.../graph/v1/AtlasTypeDefGraphStoreV1.java | 38 ++--
6 files changed, 249 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
index 95a5054..8924d44 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
@@ -32,8 +32,10 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -110,6 +112,9 @@ public class AtlasTypeRegistry {
return ret;
}
+ public AtlasBaseTypeDef getTypeDefByName(String name) { return registryData.getTypeDefByName(name); }
+
+ public AtlasBaseTypeDef getTypeDefByGuid(String guid) { return registryData.getTypeDefByGuid(guid); }
public Collection<AtlasEnumDef> getAllEnumDefs() { return registryData.enumDefs.getAll(); }
@@ -168,6 +173,7 @@ public class AtlasTypeRegistry {
final TypeDefCache<AtlasStructDef> structDefs;
final TypeDefCache<AtlasClassificationDef> classificationDefs;
final TypeDefCache<AtlasEntityDef> entityDefs;
+ final TypeDefCache<? extends AtlasBaseTypeDef>[] allDefCaches;
RegistryData() {
allTypes = new TypeCache();
@@ -175,6 +181,7 @@ public class AtlasTypeRegistry {
structDefs = new TypeDefCache<>(allTypes);
classificationDefs = new TypeDefCache<>(allTypes);
entityDefs = new TypeDefCache<>(allTypes);
+ allDefCaches = new TypeDefCache[] { enumDefs, structDefs, classificationDefs, entityDefs };
allTypes.addType(new AtlasBuiltInTypes.AtlasBooleanType());
allTypes.addType(new AtlasBuiltInTypes.AtlasByteType());
@@ -196,6 +203,39 @@ public class AtlasTypeRegistry {
structDefs = new TypeDefCache<>(other.structDefs, allTypes);
classificationDefs = new TypeDefCache<>(other.classificationDefs, allTypes);
entityDefs = new TypeDefCache<>(other.entityDefs, allTypes);
+ allDefCaches = new TypeDefCache[] { enumDefs, structDefs, classificationDefs, entityDefs };
+ }
+
+ AtlasBaseTypeDef getTypeDefByName(String name) {
+ AtlasBaseTypeDef ret = null;
+
+ if (name != null) {
+ for (TypeDefCache typeDefCache : allDefCaches) {
+ ret = typeDefCache.getTypeDefByName(name);
+
+ if (ret != null) {
+ break;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ AtlasBaseTypeDef getTypeDefByGuid(String guid) {
+ AtlasBaseTypeDef ret = null;
+
+ if (guid != null) {
+ for (TypeDefCache typeDefCache : allDefCaches) {
+ ret = typeDefCache.getTypeDefByGuid(guid);
+
+ if (ret != null) {
+ break;
+ }
+ }
+ }
+
+ return ret;
}
void updateGuid(String typeName, String guid) {
@@ -227,6 +267,10 @@ public class AtlasTypeRegistry {
}
public static class AtlasTransientTypeRegistry extends AtlasTypeRegistry {
+ private List<AtlasBaseTypeDef> addedTypes = new ArrayList<>();
+ private List<AtlasBaseTypeDef> updatedTypes = new ArrayList<>();
+ private List<AtlasBaseTypeDef> deletedTypes = new ArrayList<>();
+
private AtlasTransientTypeRegistry(AtlasTypeRegistry parent) {
super(parent);
@@ -261,7 +305,6 @@ public class AtlasTypeRegistry {
registryData.updateGuid(typeName, guid);
-
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeRegistry.updateGuid({}, {})", typeName, guid);
}
@@ -391,9 +434,15 @@ public class AtlasTypeRegistry {
}
if (guid != null) {
+ AtlasBaseTypeDef typeDef = getTypeDefByGuid(guid);
+
registryData.removeByGuid(guid);
resolveReferences();
+
+ if (typeDef != null) {
+ deletedTypes.add(typeDef);
+ }
}
if (LOG.isDebugEnabled()) {
@@ -407,9 +456,15 @@ public class AtlasTypeRegistry {
}
if (name != null) {
+ AtlasBaseTypeDef typeDef = getTypeDefByName(name);
+
registryData.removeByName(name);
resolveReferences();
+
+ if (typeDef != null) {
+ deletedTypes.add(typeDef);
+ }
}
if (LOG.isDebugEnabled()) {
@@ -417,6 +472,12 @@ public class AtlasTypeRegistry {
}
}
+ public List<AtlasBaseTypeDef> getAddedTypes() { return addedTypes; }
+
+ public List<AtlasBaseTypeDef> getUpdatedTypes() { return updatedTypes; }
+
+ public List<AtlasBaseTypeDef> getDeleteedTypes() { return deletedTypes; }
+
private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) {
if (LOG.isDebugEnabled()) {
@@ -442,6 +503,8 @@ public class AtlasTypeRegistry {
registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef));
}
+
+ addedTypes.add(typeDef);
}
if (LOG.isDebugEnabled()) {
@@ -490,29 +553,32 @@ public class AtlasTypeRegistry {
LOG.debug("==> AtlasTypeRegistry.updateTypeByGuidWithNoRefResolve({})", guid);
}
- if (guid == null || typeDef == null) {
+ if (guid != null && typeDef != null) {
// ignore
- } else if (typeDef.getClass().equals(AtlasEnumDef.class)) {
- AtlasEnumDef enumDef = (AtlasEnumDef)typeDef;
+ if (typeDef.getClass().equals(AtlasEnumDef.class)) {
+ AtlasEnumDef enumDef = (AtlasEnumDef) typeDef;
- registryData.enumDefs.removeTypeDefByGuid(guid);
- registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef));
- } else if (typeDef.getClass().equals(AtlasStructDef.class)) {
- AtlasStructDef structDef = (AtlasStructDef)typeDef;
+ registryData.enumDefs.removeTypeDefByGuid(guid);
+ registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef));
+ } else if (typeDef.getClass().equals(AtlasStructDef.class)) {
+ AtlasStructDef structDef = (AtlasStructDef) typeDef;
- registryData.structDefs.removeTypeDefByGuid(guid);
- registryData.structDefs.addType(structDef, new AtlasStructType(structDef));
- } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) {
- AtlasClassificationDef classificationDef = (AtlasClassificationDef)typeDef;
+ registryData.structDefs.removeTypeDefByGuid(guid);
+ registryData.structDefs.addType(structDef, new AtlasStructType(structDef));
+ } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) {
+ AtlasClassificationDef classificationDef = (AtlasClassificationDef) typeDef;
- registryData.classificationDefs.removeTypeDefByGuid(guid);
- registryData.classificationDefs.addType(classificationDef,
- new AtlasClassificationType(classificationDef));
- } else if (typeDef.getClass().equals(AtlasEntityDef.class)) {
- AtlasEntityDef entityDef = (AtlasEntityDef)typeDef;
+ registryData.classificationDefs.removeTypeDefByGuid(guid);
+ registryData.classificationDefs.addType(classificationDef,
+ new AtlasClassificationType(classificationDef));
+ } else if (typeDef.getClass().equals(AtlasEntityDef.class)) {
+ AtlasEntityDef entityDef = (AtlasEntityDef) typeDef;
+
+ registryData.entityDefs.removeTypeDefByGuid(guid);
+ registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef));
+ }
- registryData.entityDefs.removeTypeDefByGuid(guid);
- registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef));
+ updatedTypes.add(typeDef);
}
if (LOG.isDebugEnabled()) {
@@ -525,29 +591,31 @@ public class AtlasTypeRegistry {
LOG.debug("==> AtlasTypeRegistry.updateTypeByNameWithNoRefResolve({})", name);
}
- if (name == null || typeDef == null) {
- // ignore
- } else if (typeDef.getClass().equals(AtlasEnumDef.class)) {
- AtlasEnumDef enumDef = (AtlasEnumDef)typeDef;
-
- registryData.enumDefs.removeTypeDefByName(name);
- registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef));
- } else if (typeDef.getClass().equals(AtlasStructDef.class)) {
- AtlasStructDef structDef = (AtlasStructDef)typeDef;
-
- registryData.structDefs.removeTypeDefByName(name);
- registryData.structDefs.addType(structDef, new AtlasStructType(structDef));
- } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) {
- AtlasClassificationDef classificationDef = (AtlasClassificationDef)typeDef;
-
- registryData.classificationDefs.removeTypeDefByName(name);
- registryData.classificationDefs.addType(classificationDef,
- new AtlasClassificationType(classificationDef));
- } else if (typeDef.getClass().equals(AtlasEntityDef.class)) {
- AtlasEntityDef entityDef = (AtlasEntityDef)typeDef;
-
- registryData.entityDefs.removeTypeDefByName(name);
- registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef));
+ if (name != null && typeDef != null) {
+ if (typeDef.getClass().equals(AtlasEnumDef.class)) {
+ AtlasEnumDef enumDef = (AtlasEnumDef) typeDef;
+
+ registryData.enumDefs.removeTypeDefByName(name);
+ registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef));
+ } else if (typeDef.getClass().equals(AtlasStructDef.class)) {
+ AtlasStructDef structDef = (AtlasStructDef) typeDef;
+
+ registryData.structDefs.removeTypeDefByName(name);
+ registryData.structDefs.addType(structDef, new AtlasStructType(structDef));
+ } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) {
+ AtlasClassificationDef classificationDef = (AtlasClassificationDef) typeDef;
+
+ registryData.classificationDefs.removeTypeDefByName(name);
+ registryData.classificationDefs.addType(classificationDef,
+ new AtlasClassificationType(classificationDef));
+ } else if (typeDef.getClass().equals(AtlasEntityDef.class)) {
+ AtlasEntityDef entityDef = (AtlasEntityDef) typeDef;
+
+ registryData.entityDefs.removeTypeDefByName(name);
+ registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef));
+ }
+
+ updatedTypes.add(typeDef);
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index 1f8affe..c773bac 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -26,8 +26,14 @@ import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+
public class GraphTransactionInterceptor implements MethodInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class);
+
+ private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>();
+
private AtlasGraph graph;
@Override
@@ -37,19 +43,38 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
graph = AtlasGraphProvider.getGraphInstance();
}
+ boolean isSuccess = false;
+
try {
- Object response = invocation.proceed();
- graph.commit();
- LOG.info("graph commit");
- return response;
- } catch (Throwable t) {
- if (logException(t)) {
- LOG.error("graph rollback due to exception ", t);
- } else {
- LOG.error("graph rollback due to exception " + t.getClass().getSimpleName() + ":" + t.getMessage());
+ try {
+ Object response = invocation.proceed();
+ graph.commit();
+ isSuccess = true;
+ LOG.info("graph commit");
+ return response;
+ } catch (Throwable t) {
+ if (logException(t)) {
+ LOG.error("graph rollback due to exception ", t);
+ } else {
+ LOG.error("graph rollback due to exception " + t.getClass().getSimpleName() + ":" + t.getMessage());
+ }
+ graph.rollback();
+ throw t;
+ }
+ } finally {
+ List<PostTransactionHook> trxHooks = postTransactionHooks.get();
+
+ if (trxHooks != null) {
+ postTransactionHooks.remove();
+
+ for (PostTransactionHook trxHook : trxHooks) {
+ try {
+ trxHook.onComplete(isSuccess);
+ } catch (Throwable t) {
+ LOG.error("postTransactionHook failed", t);
+ }
+ }
}
- graph.rollback();
- throw t;
}
}
@@ -59,4 +84,19 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
return true;
}
+
+ public static abstract class PostTransactionHook {
+ protected PostTransactionHook() {
+ List<PostTransactionHook> trxHooks = postTransactionHooks.get();
+
+ if (trxHooks == null) {
+ trxHooks = new ArrayList<>();
+ postTransactionHooks.set(trxHooks);
+ }
+
+ trxHooks.add(this);
+ }
+
+ public abstract void onComplete(boolean isSuccess);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
index 0325c80..cd44318 100755
--- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
+++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
@@ -48,6 +48,7 @@ import org.apache.atlas.services.IBootstrapTypesRegistrar;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.services.ReservedTypesRegistrar;
import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeSystemProvider;
import org.apache.atlas.typesystem.types.cache.TypeCache;
@@ -71,6 +72,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the ITypeStore interface to an implementation
bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton();
bind(AtlasTypeDefStore.class).to(AtlasTypeDefGraphStoreV1.class).asEagerSingleton();
+ bind(AtlasTypeRegistry.class).asEagerSingleton();
//GraphBackedSearchIndexer must be an eager singleton to force the search index creation to happen before
//we try to restore the type system (otherwise we'll end up running queries
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 3c7f63b..67b5362 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -285,7 +285,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
} else if (isEnumType(atlasType)) {
createIndexes(management, propertyName, String.class, isUnique, cardinality, false, isIndexable);
} else if (isStructType(atlasType)) {
- AtlasStructDef structDef = typeRegistry.getStructDefByName(attributeDef.getName());
+ AtlasStructDef structDef = typeRegistry.getStructDefByName(attribTypeName);
updateIndexForTypeDef(management, structDef);
}
} catch (AtlasBaseException e) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
index 68d2781..2163e01 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
@@ -17,18 +17,15 @@
*/
package org.apache.atlas.repository.store.graph;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction;
+import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.model.SearchFilter;
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef.AtlasClassificationDefs;
import org.apache.atlas.model.typedef.AtlasEntityDef;
@@ -49,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -106,9 +102,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid());
- notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -155,9 +149,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasEnumDef ret = getEnumDefStore(ttr).updateByName(name, enumDef);
- notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -171,9 +163,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasEnumDef ret = getEnumDefStore(ttr).updateByGuid(guid, enumDef);
- notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -189,9 +179,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getEnumDefStore(ttr).deleteByName(name);
- notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
}
@Override
@@ -205,9 +193,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getEnumDefStore(ttr).deleteByGuid(guid);
- notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
}
@Override
@@ -231,9 +217,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid());
- notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -280,9 +264,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasStructDef ret = getStructDefStore(ttr).updateByName(name, structDef);
- notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -296,9 +278,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasStructDef ret = getStructDefStore(ttr).updateByGuid(guid, structDef);
- notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -314,9 +294,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getStructDefStore(ttr).deleteByName(name, null);
- notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
}
@Override
@@ -330,9 +308,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getStructDefStore(ttr).deleteByGuid(guid, null);
- notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
}
@Override
@@ -357,9 +333,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid());
- notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -408,9 +382,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByName(name, classificationDef);
- notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -425,9 +397,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByGuid(guid, classificationDef);
- notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -443,9 +413,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getClassificationDefStore(ttr).deleteByName(name, null);
- notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
}
@Override
@@ -459,9 +427,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getClassificationDefStore(ttr).deleteByGuid(guid, null);
- notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
}
@Override
@@ -485,9 +451,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid());
- notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -534,9 +498,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasEntityDef ret = getEntityDefStore(ttr).updateByName(name, entityDef);
- notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -550,9 +512,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasEntityDef ret = getEntityDefStore(ttr).updateByGuid(guid, entityDef);
- notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
return ret;
}
@@ -568,9 +528,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getEntityDefStore(ttr).deleteByName(name, null);
- notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
}
@Override
@@ -584,9 +542,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getEntityDefStore(ttr).deleteByGuid(guid, null);
- notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
}
@Override
@@ -689,18 +645,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
}
}
- List<AtlasBaseTypeDef> createdTypeDefs = new ArrayList<>();
- createdTypeDefs.addAll(ret.getEnumDefs());
- createdTypeDefs.addAll(ret.getStructDefs());
- createdTypeDefs.addAll(ret.getClassificationDefs());
- createdTypeDefs.addAll(ret.getEntityDefs());
-
- ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs();
- changedTypeDefs.setCreateTypeDefs(createdTypeDefs);
-
- notifyListeners(changedTypeDefs);
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})",
@@ -759,18 +704,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
}
}
- List<AtlasBaseTypeDef> updatedTypeDefs = new ArrayList<>();
- updatedTypeDefs.addAll(ret.getEnumDefs());
- updatedTypeDefs.addAll(ret.getStructDefs());
- updatedTypeDefs.addAll(ret.getClassificationDefs());
- updatedTypeDefs.addAll(ret.getEntityDefs());
-
- ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs();
- changedTypeDefs.setUpdatedTypeDefs(updatedTypeDefs);
-
- notifyListeners(changedTypeDefs);
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})",
@@ -884,12 +818,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
}
}
- Iterable<AtlasBaseTypeDef> deleted = Iterables.concat(typesDef.getEnumDefs(), typesDef.getClassificationDefs(),
- typesDef.getClassificationDefs(), typesDef.getEntityDefs());
-
- notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(deleted));
-
- typeRegistry.commitTransientTypeRegistry(ttr);
+ updateTypeRegistryPostCommit(ttr);
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})",
@@ -957,38 +886,50 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
LOG.info("Not reacting to a Passive state change");
}
- private void notifyListeners(TypeDefChangeType type, List<? extends AtlasBaseTypeDef> typeDefs)
- throws AtlasBaseException {
- ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs();
- switch (type) {
- case CREATE:
- changedTypeDefs.setCreateTypeDefs(typeDefs);
- break;
- case UPDATE:
- changedTypeDefs.setUpdatedTypeDefs(typeDefs);
- break;
- case DELETE:
- changedTypeDefs.setDeletedTypeDefs(typeDefs);
- break;
- }
-
- notifyListeners(changedTypeDefs);
- }
-
- private void notifyListeners(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
- if (CollectionUtils.isNotEmpty(typeDefChangeListeners)) {
- for (TypeDefChangeListener changeListener : typeDefChangeListeners) {
- try {
- changeListener.onChange(changedTypeDefs);
- } catch (AtlasBaseException e) {
- LOG.error("OnChange failed for listener {}", changeListener.getClass().getName());
- throw e;
- }
+ private void updateTypeRegistryPostCommit(AtlasTransientTypeRegistry ttr) {
+ new TypeRegistryUpdateHook(ttr);
+ }
+
+ private class TypeRegistryUpdateHook extends GraphTransactionInterceptor.PostTransactionHook {
+ private final AtlasTransientTypeRegistry ttr;
+
+ private TypeRegistryUpdateHook(AtlasTransientTypeRegistry ttr) {
+ super();
+
+ this.ttr = ttr;
+ }
+
+ @Override
+ public void onComplete(boolean isSuccess) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TypeRegistryUpdateHook.onComplete({})", isSuccess);
+ }
+
+ if (isSuccess) {
+ typeRegistry.commitTransientTypeRegistry(ttr);
+
+ notifyListeners(ttr);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TypeRegistryUpdateHook.onComplete({})", isSuccess);
}
}
- }
- private enum TypeDefChangeType {
- CREATE, UPDATE, DELETE
+ private void notifyListeners(AtlasTransientTypeRegistry ttr) {
+ if (CollectionUtils.isNotEmpty(typeDefChangeListeners)) {
+ ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(ttr.getAddedTypes(),
+ ttr.getUpdatedTypes(),
+ ttr.getDeleteedTypes());
+
+ for (TypeDefChangeListener changeListener : typeDefChangeListeners) {
+ try {
+ changeListener.onChange(changedTypeDefs);
+ } catch (Throwable t) {
+ LOG.error("OnChange failed for listener {}", changeListener.getClass().getName(), t);
+ }
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
index 878f355..73b64a3 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
@@ -110,9 +110,9 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
LOG.info("<== AtlasTypeDefGraphStoreV1.init()");
}
- public AtlasGraph getAtlasGraph() { return atlasGraph; }
+ AtlasGraph getAtlasGraph() { return atlasGraph; }
- public AtlasVertex findTypeVertexByName(String typeName) {
+ AtlasVertex findTypeVertexByName(String typeName) {
Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(Constants.TYPENAME_PROPERTY_KEY, typeName)
.vertices().iterator();
@@ -122,7 +122,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public AtlasVertex findTypeVertexByNameAndCategory(String typeName, TypeCategory category) {
+ AtlasVertex findTypeVertexByNameAndCategory(String typeName, TypeCategory category) {
Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(Constants.TYPENAME_PROPERTY_KEY, typeName)
.has(TYPE_CATEGORY_PROPERTY_KEY, category)
@@ -133,7 +133,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public AtlasVertex findTypeVertexByGuid(String typeGuid) {
+ AtlasVertex findTypeVertexByGuid(String typeGuid) {
Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(Constants.GUID_PROPERTY_KEY, typeGuid)
.vertices().iterator();
@@ -143,7 +143,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public AtlasVertex findTypeVertexByGuidAndCategory(String typeGuid, TypeCategory category) {
+ AtlasVertex findTypeVertexByGuidAndCategory(String typeGuid, TypeCategory category) {
Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(Constants.GUID_PROPERTY_KEY, typeGuid)
.has(TYPE_CATEGORY_PROPERTY_KEY, category)
@@ -154,7 +154,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public Iterator<AtlasVertex> findTypeVerticesByCategory(TypeCategory category) {
+ Iterator<AtlasVertex> findTypeVerticesByCategory(TypeCategory category) {
Iterator<AtlasVertex> ret = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(TYPE_CATEGORY_PROPERTY_KEY, category)
.vertices().iterator();
@@ -162,7 +162,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public AtlasVertex createTypeVertex(AtlasBaseTypeDef typeDef) {
+ AtlasVertex createTypeVertex(AtlasBaseTypeDef typeDef) {
// Validate all the required checks
Preconditions.checkArgument(StringUtils.isNotBlank(typeDef.getName()), "Type name can't be null/empty");
@@ -203,7 +203,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public void updateTypeVertex(AtlasBaseTypeDef typeDef, AtlasVertex vertex) {
+ void updateTypeVertex(AtlasBaseTypeDef typeDef, AtlasVertex vertex) {
if (!isTypeVertex(vertex)) {
LOG.warn("updateTypeVertex(): not a type-vertex - {}", vertex);
@@ -223,7 +223,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
markVertexUpdated(vertex);
}
- public void deleteTypeVertexOutEdges(AtlasVertex vertex) throws AtlasBaseException {
+ void deleteTypeVertexOutEdges(AtlasVertex vertex) throws AtlasBaseException {
Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT);
for (AtlasEdge edge : edges) {
@@ -231,7 +231,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
}
}
- public void deleteTypeVertex(AtlasVertex vertex) throws AtlasBaseException {
+ void deleteTypeVertex(AtlasVertex vertex) throws AtlasBaseException {
Iterator<AtlasEdge> inEdges = vertex.getEdges(AtlasEdgeDirection.IN).iterator();
if (inEdges.hasNext()) {
@@ -247,7 +247,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
atlasGraph.removeVertex(vertex);
}
- public void vertexToTypeDef(AtlasVertex vertex, AtlasBaseTypeDef typeDef) {
+ void vertexToTypeDef(AtlasVertex vertex, AtlasBaseTypeDef typeDef) {
String name = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class);
String description = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class);
String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class);
@@ -274,7 +274,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
}
}
- public boolean isTypeVertex(AtlasVertex vertex) {
+ boolean isTypeVertex(AtlasVertex vertex) {
String vertexType = vertex.getProperty(Constants.VERTEX_TYPE_PROPERTY_KEY, String.class);
boolean ret = VERTEX_TYPE.equals(vertexType);
@@ -282,7 +282,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public boolean isTypeVertex(AtlasVertex vertex, TypeCategory category) {
+ boolean isTypeVertex(AtlasVertex vertex, TypeCategory category) {
boolean ret = false;
if (isTypeVertex(vertex)) {
@@ -294,7 +294,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public boolean isTypeVertex(AtlasVertex vertex, TypeCategory[] categories) {
+ boolean isTypeVertex(AtlasVertex vertex, TypeCategory[] categories) {
boolean ret = false;
if (isTypeVertex(vertex)) {
@@ -312,7 +312,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) {
+ AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) {
AtlasEdge ret = null;
Iterable<AtlasEdge> edges = outVertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel);
@@ -330,13 +330,13 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- public AtlasEdge addEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) {
+ AtlasEdge addEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) {
AtlasEdge ret = atlasGraph.addEdge(outVertex, inVertex, edgeLabel);
return ret;
}
- public void createSuperTypeEdges(AtlasVertex vertex, Set<String> superTypes, TypeCategory typeCategory)
+ void createSuperTypeEdges(AtlasVertex vertex, Set<String> superTypes, TypeCategory typeCategory)
throws AtlasBaseException {
Set<String> currentSuperTypes = getSuperTypeNames(vertex);
@@ -355,7 +355,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
}
}
- public Set<String> getSuperTypeNames(AtlasVertex vertex) {
+ Set<String> getSuperTypeNames(AtlasVertex vertex) {
Set<String> ret = new HashSet<>();
Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, AtlasGraphUtilsV1.SUPERTYPE_EDGE_LABEL);
@@ -366,7 +366,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret;
}
- private TypeCategory getTypeCategory(AtlasBaseTypeDef typeDef) {
+ TypeCategory getTypeCategory(AtlasBaseTypeDef typeDef) {
TypeCategory ret = null;
if (typeDef instanceof AtlasEntityDef) {