You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/01/29 02:21:15 UTC
incubator-atlas git commit: ATLAS-1472: updated type-registry to
handle simultaneous updates from multiple threads
Repository: incubator-atlas
Updated Branches:
refs/heads/master 57f4f79d6 -> 4b8b9e22f
ATLAS-1472: updated type-registry to handle simultaneous updates from multiple threads
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/4b8b9e22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/4b8b9e22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/4b8b9e22
Branch: refs/heads/master
Commit: 4b8b9e22f8e279f739899bc50e20ffdae6142586
Parents: 57f4f79
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Fri Jan 27 03:22:26 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sat Jan 28 17:47:43 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/atlas/AtlasErrorCode.java | 1 +
.../apache/atlas/type/AtlasTypeRegistry.java | 117 ++++++++-
.../org/apache/atlas/type/AtlasTypeUtil.java | 12 +-
.../org/apache/atlas/model/ModelTestUtil.java | 36 ++-
.../apache/atlas/type/TestAtlasEntityType.java | 51 ++--
.../atlas/type/TestAtlasTypeRegistry.java | 185 ++++++++++++++-
.../store/graph/AtlasTypeDefGraphStore.java | 236 ++++++-------------
.../graph/v1/AtlasTypeDefGraphStoreV1.java | 8 +-
.../util/AtlasRepositoryConfiguration.java | 21 ++
9 files changed, 450 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index e7dbb1c..0fb16c6 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -76,6 +76,7 @@ public enum AtlasErrorCode {
INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"),
INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"),
INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"),
+ FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"),
INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND(400, "ATLAS40018E", "Instance {0} with unique attribute {1} does not exist"),
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
index 3de0215..3f3ea59 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
@@ -38,6 +38,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX;
@@ -51,15 +53,20 @@ import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUF
@Singleton
public class AtlasTypeRegistry {
private static final Logger LOG = LoggerFactory.getLogger(AtlasStructType.class);
+ private static final int DEFAULT_LOCK_MAX_WAIT_TIME_IN_SECONDS = 15;
- protected RegistryData registryData;
+ protected RegistryData registryData;
+ private final TypeRegistryUpdateSynchronizer updateSynchronizer;
public AtlasTypeRegistry() {
- registryData = new RegistryData();
+ registryData = new RegistryData();
+ updateSynchronizer = new TypeRegistryUpdateSynchronizer(this);
}
+ // used only by AtlasTransientTypeRegistry
protected AtlasTypeRegistry(AtlasTypeRegistry other) {
- registryData = new RegistryData(other.registryData);
+ registryData = new RegistryData(other.registryData);
+ updateSynchronizer = other.updateSynchronizer;
}
public Collection<String> getAllTypeNames() { return registryData.allTypes.getAllTypeNames(); }
@@ -195,14 +202,19 @@ public class AtlasTypeRegistry {
public AtlasEntityType getEntityTypeByName(String name) { return registryData.entityDefs.getTypeByName(name); }
- public AtlasTransientTypeRegistry createTransientTypeRegistry() {
- return new AtlasTransientTypeRegistry(this);
+ public AtlasTransientTypeRegistry lockTypeRegistryForUpdate() throws AtlasBaseException {
+ return lockTypeRegistryForUpdate(DEFAULT_LOCK_MAX_WAIT_TIME_IN_SECONDS);
}
- public void commitTransientTypeRegistry(AtlasTransientTypeRegistry transientTypeRegistry) {
- this.registryData = transientTypeRegistry.registryData;
+ public AtlasTransientTypeRegistry lockTypeRegistryForUpdate(int lockMaxWaitTimeInSeconds) throws AtlasBaseException {
+ return updateSynchronizer.lockTypeRegistryForUpdate(lockMaxWaitTimeInSeconds);
}
+ public void releaseTypeRegistryForUpdate(AtlasTransientTypeRegistry transientTypeRegistry, boolean commitUpdates) {
+ updateSynchronizer.releaseTypeRegistryForUpdate(transientTypeRegistry, commitUpdates);
+ }
+
+
static class RegistryData {
final TypeCache allTypes;
final TypeDefCache<AtlasEnumDef, AtlasEnumType> enumDefs;
@@ -519,12 +531,16 @@ public class AtlasTypeRegistry {
public List<AtlasBaseTypeDef> getDeleteedTypes() { return deletedTypes; }
- private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) {
+ private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) throws AtlasBaseException{
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeRegistry.addTypeWithNoRefResolve({})", typeDef);
}
if (typeDef != null) {
+ if (this.isRegisteredType(typeDef.getName())) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, typeDef.getName());
+ }
+
if (typeDef.getClass().equals(AtlasEnumDef.class)) {
AtlasEnumDef enumDef = (AtlasEnumDef) typeDef;
@@ -552,7 +568,7 @@ public class AtlasTypeRegistry {
}
}
- private void addTypesWithNoRefResolve(Collection<? extends AtlasBaseTypeDef> typeDefs) {
+ private void addTypesWithNoRefResolve(Collection<? extends AtlasBaseTypeDef> typeDefs) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeRegistry.addTypesWithNoRefResolve(length={})",
(typeDefs == null ? 0 : typeDefs.size()));
@@ -681,6 +697,89 @@ public class AtlasTypeRegistry {
}
}
}
+
+ static class TypeRegistryUpdateSynchronizer {
+ private final AtlasTypeRegistry typeRegistry;
+ private final ReentrantLock typeRegistryUpdateLock;
+ private AtlasTransientTypeRegistry typeRegistryUnderUpdate = null;
+ private String lockedByThread = null;
+
+ TypeRegistryUpdateSynchronizer(AtlasTypeRegistry typeRegistry) {
+ this.typeRegistry = typeRegistry;
+ this.typeRegistryUpdateLock = new ReentrantLock();
+ }
+
+ AtlasTransientTypeRegistry lockTypeRegistryForUpdate(int lockMaxWaitTimeInSeconds) throws AtlasBaseException {
+ LOG.debug("==> lockTypeRegistryForUpdate()");
+
+ boolean alreadyLockedByCurrentThread = typeRegistryUpdateLock.isHeldByCurrentThread();
+
+ if (!alreadyLockedByCurrentThread) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("lockTypeRegistryForUpdate(): waiting for lock to be released by thread {}", lockedByThread);
+ }
+ } else {
+ LOG.warn("lockTypeRegistryForUpdate(): already locked. currentLockCount={}",
+ typeRegistryUpdateLock.getHoldCount());
+ }
+
+ try {
+ boolean isLocked = typeRegistryUpdateLock.tryLock(lockMaxWaitTimeInSeconds, TimeUnit.SECONDS);
+
+ if (!isLocked) {
+ throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK);
+ }
+ } catch (InterruptedException excp) {
+ throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK, excp);
+ }
+
+ if (!alreadyLockedByCurrentThread) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("lockTypeRegistryForUpdate(): wait over..got the lock");
+ }
+
+ typeRegistryUnderUpdate = new AtlasTransientTypeRegistry(typeRegistry);
+ lockedByThread = Thread.currentThread().getName();
+ }
+
+ LOG.debug("<== lockTypeRegistryForUpdate()");
+
+ return typeRegistryUnderUpdate;
+ }
+
+ void releaseTypeRegistryForUpdate(AtlasTransientTypeRegistry ttr, boolean commitUpdates) {
+ LOG.debug("==> releaseTypeRegistryForUpdate()");
+
+ if (typeRegistryUpdateLock.isHeldByCurrentThread()) {
+ try {
+ if (typeRegistryUnderUpdate != ttr) {
+ LOG.error("releaseTypeRegistryForUpdate(): incorrect typeRegistry returned for release" +
+ ": found=" + ttr + "; expected=" + typeRegistryUnderUpdate,
+ new Exception().fillInStackTrace());
+ } else if (typeRegistryUpdateLock.getHoldCount() == 1) {
+ if (ttr != null && commitUpdates) {
+ typeRegistry.registryData = ttr.registryData;
+ }
+ }
+
+ if (typeRegistryUpdateLock.getHoldCount() == 1) {
+ lockedByThread = null;
+ typeRegistryUnderUpdate = null;
+ } else {
+ LOG.warn("releaseTypeRegistryForUpdate(): pendingReleaseCount={}", typeRegistryUpdateLock.getHoldCount() - 1);
+ }
+ } finally {
+ typeRegistryUpdateLock.unlock();
+ }
+ } else {
+ LOG.error("releaseTypeRegistryForUpdate(): current thread does not hold the lock",
+ new Exception().fillInStackTrace());
+ }
+
+ LOG.debug("<== releaseTypeRegistryForUpdate()");
+ }
+
+ }
}
class TypeCache {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
index 089bebee..e4f1eea 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
@@ -17,7 +17,6 @@
*/
package org.apache.atlas.type;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasErrorCode;
@@ -36,7 +35,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -52,21 +50,19 @@ import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_S
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_KEY_VAL_SEP;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_PREFIX;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUFFIX;
-import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_REF_ATTRIBUTE;
-import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_MAPPED_FROM_REF;
/**
* Utility methods for AtlasType/AtlasTypeDef.
*/
public class AtlasTypeUtil {
private static final Set<String> ATLAS_BUILTIN_TYPENAMES = new HashSet<>();
- private static final String NAME_REGEX = "[a-zA-Z][a-zA-Z0-9_ ]*";
+ private static final String NAME_REGEX = "[a-zA-Z][a-zA-Z0-9_ ]*";
private static final String TRAIT_NAME_REGEX = "[a-zA-Z][a-zA-Z0-9_ .]*";
- private static final Pattern NAME_PATTERN = Pattern.compile(NAME_REGEX);
+ private static final Pattern NAME_PATTERN = Pattern.compile(NAME_REGEX);
private static final Pattern TRAIT_NAME_PATTERN = Pattern.compile(TRAIT_NAME_REGEX);
- private static final String InvalidTypeNameErrorMessage = "Names must consist of a letter followed by a sequence of letter, number, or '_' characters.";
- private static final String InvalidTraitTypeNameErrorMessage = "Names must consist of a leter followed by a sequence of letters, numbers, '.', or '_' characters.";
+ private static final String InvalidTypeNameErrorMessage = "Name must consist of a letter followed by a sequence of [ letter, number, '_' ] characters.";
+ private static final String InvalidTraitTypeNameErrorMessage = "Name must consist of a letter followed by a sequence of [ letter, number, '_', '.' ] characters.";
static {
Collections.addAll(ATLAS_BUILTIN_TYPENAMES, AtlasBaseTypeDef.ATLAS_BUILTIN_TYPES);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java
----------------------------------------------------------------------
diff --git a/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java b/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java
index 6d3c312..5c72470 100644
--- a/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java
+++ b/intg/src/test/java/org/apache/atlas/model/ModelTestUtil.java
@@ -158,16 +158,21 @@ public final class ModelTestUtil {
ret.setDefaultValue(ret.getElementDefs().get(idxDefault).getValue());
}
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
+
try {
- AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
+ ttr = typesRegistry.lockTypeRegistryForUpdate();
ttr.addType(ret);
- typesRegistry.commitTransientTypeRegistry(ttr);
+ commit = true;
} catch (AtlasBaseException excp) {
LOG.error("failed to create enum-def", excp);
ret = null;
+ } finally {
+ typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
return ret;
@@ -186,16 +191,21 @@ public final class ModelTestUtil {
ret.setDescription(ret.getName());
ret.setAttributeDefs(newAttributeDefsWithAllBuiltInTypes(PREFIX_ATTRIBUTE_NAME));
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
+
try {
- AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
+ ttr = typesRegistry.lockTypeRegistryForUpdate();
ttr.addType(ret);
- typesRegistry.commitTransientTypeRegistry(ttr);
+ commit = true;
} catch (AtlasBaseException excp) {
LOG.error("failed to create struct-def", excp);
ret = null;
+ } finally {
+ typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
return ret;
@@ -228,16 +238,21 @@ public final class ModelTestUtil {
}
}
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
+
try {
- AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
+ ttr = typesRegistry.lockTypeRegistryForUpdate();
ttr.addType(ret);
- typesRegistry.commitTransientTypeRegistry(ttr);
+ commit = true;
} catch (AtlasBaseException excp) {
LOG.error("failed to create entity-def", excp);
ret = null;
+ } finally {
+ typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
return ret;
@@ -279,16 +294,21 @@ public final class ModelTestUtil {
}
}
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
+
try {
- AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
+ ttr = typesRegistry.lockTypeRegistryForUpdate();
ttr.addType(ret);
- typesRegistry.commitTransientTypeRegistry(ttr);
+ commit = true;
} catch (AtlasBaseException excp) {
LOG.error("failed to create classification-def", excp);
ret = null;
+ } finally {
+ typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
return ret;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java
----------------------------------------------------------------------
diff --git a/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java b/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java
index 710840f..4e15edd 100644
--- a/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java
+++ b/intg/src/test/java/org/apache/atlas/type/TestAtlasEntityType.java
@@ -124,21 +124,25 @@ public class TestAtlasEntityType {
@Test
public void testForeignKeyConstraintValid() {
- AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- List<AtlasEntityDef> entityDefs = new ArrayList<>();
- String failureMsg = null;
+ AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
+ List<AtlasEntityDef> entityDefs = new ArrayList<>();
+ String failureMsg = null;
entityDefs.add(createTableEntityDef());
entityDefs.add(createColumnEntityDef());
try {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
ttr.addTypes(entityDefs);
- typeRegistry.commitTransientTypeRegistry(ttr);
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNull(failureMsg, "failed to create types my_table and my_column");
}
@@ -151,55 +155,68 @@ public class TestAtlasEntityType {
entityDefs.add(createTableEntityDef());
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
+
try {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
ttr.addTypes(entityDefs);
- typeRegistry.commitTransientTypeRegistry(ttr);
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid constraint failure - unknown attribute in mappedFromRef");
}
@Test
public void testForeignKeyConstraintInValidMappedFromRef2() {
- AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- List<AtlasEntityDef> entityDefs = new ArrayList<>();
- String failureMsg = null;
+ AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
+ List<AtlasEntityDef> entityDefs = new ArrayList<>();
+ String failureMsg = null;
entityDefs.add(createTableEntityDefWithMissingRefAttribute());
entityDefs.add(createColumnEntityDef());
try {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
ttr.addTypes(entityDefs);
- typeRegistry.commitTransientTypeRegistry(ttr);
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid constraint failure - missing refAttribute in mappedFromRef");
}
@Test
public void testForeignKeyConstraintInValidForeignKey() {
- AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- List<AtlasEntityDef> entityDefs = new ArrayList<>();
- String failureMsg = null;
+ AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
+ List<AtlasEntityDef> entityDefs = new ArrayList<>();
+ String failureMsg = null;
entityDefs.add(createColumnEntityDef());
try {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
ttr.addTypes(entityDefs);
- typeRegistry.commitTransientTypeRegistry(ttr);
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid constraint failure - unknown attribute in foreignKey");
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java
----------------------------------------------------------------------
diff --git a/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java b/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java
index 60a09a1..f93a2e8 100644
--- a/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java
+++ b/intg/src/test/java/org/apache/atlas/type/TestAtlasTypeRegistry.java
@@ -19,11 +19,8 @@ package org.apache.atlas.type;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
-import org.apache.atlas.model.typedef.AtlasClassificationDef;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.*;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry;
import org.testng.annotations.Test;
@@ -31,6 +28,10 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static org.testng.Assert.*;
@@ -82,17 +83,23 @@ public class TestAtlasTypeRegistry {
typesDef.getClassificationDefs().add(classifiL2_4);
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
String failureMsg = null;
try {
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
+
ttr.addTypes(typesDef);
+
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNull(failureMsg);
- typeRegistry.commitTransientTypeRegistry(ttr);
validateSuperTypes(typeRegistry, "L0", new HashSet<String>());
validateSuperTypes(typeRegistry, "L1-1", new HashSet<>(Arrays.asList("L0")));
@@ -126,13 +133,20 @@ public class TestAtlasTypeRegistry {
classifiDef1.addSuperType(classifiDef1.getName());
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
String failureMsg = null;
try {
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
+
ttr.addType(classifiDef1);
+
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid supertype failure");
}
@@ -178,13 +192,20 @@ public class TestAtlasTypeRegistry {
typesDef.getClassificationDefs().add(classifiL2_4);
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
String failureMsg = null;
try {
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
+
ttr.addTypes(typesDef);
+
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid supertype failure");
}
@@ -235,18 +256,23 @@ public class TestAtlasTypeRegistry {
typesDef.getEntityDefs().add(entL2_4);
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
String failureMsg = null;
try {
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
+
ttr.addTypes(typesDef);
+
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNull(failureMsg);
- typeRegistry.commitTransientTypeRegistry(ttr);
-
validateSuperTypes(typeRegistry, "L0", new HashSet<String>());
validateSuperTypes(typeRegistry, "L1-1", new HashSet<>(Arrays.asList("L0")));
validateSuperTypes(typeRegistry, "L1-2", new HashSet<>(Arrays.asList("L0")));
@@ -279,13 +305,20 @@ public class TestAtlasTypeRegistry {
entDef1.addSuperType(entDef1.getName());
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
String failureMsg = null;
try {
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
+
ttr.addType(entDef1);
+
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid supertype failure");
}
@@ -331,17 +364,143 @@ public class TestAtlasTypeRegistry {
typesDef.getEntityDefs().add(entL2_4);
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
String failureMsg = null;
try {
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
+
ttr.addTypes(typesDef);
+
+ commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid supertype failure");
}
+ @Test
+ public void testNestedUpdates() {
+ AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commit = false;
+ String failureMsg = null;
+ AtlasClassificationDef testTag1 = new AtlasClassificationDef("testTag1");
+ AtlasClassificationDef testTag2 = new AtlasClassificationDef("testTag2");
+
+ try {
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
+
+ ttr.addType(testTag1);
+
+ // changes should not be seen in typeRegistry until lock is released
+ assertFalse(typeRegistry.isRegisteredType(testTag1.getName()),
+ "type added should be seen in typeRegistry only after commit");
+
+ boolean isNestedUpdateSuccess = addType(typeRegistry, testTag2);
+
+ assertTrue(isNestedUpdateSuccess);
+
+ // changes made in nested commit, inside addType(), should not be seen in typeRegistry until lock is released here
+ assertFalse(typeRegistry.isRegisteredType(testTag2.getName()),
+ "type added within nested commit should be seen in typeRegistry only after outer commit");
+
+ commit = true;
+ } catch (AtlasBaseException excp) {
+ failureMsg = excp.getMessage();
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
+ }
+ assertNull(failureMsg);
+ assertTrue(typeRegistry.isRegisteredType(testTag1.getName()));
+ assertTrue(typeRegistry.isRegisteredType(testTag2.getName()));
+ }
+
+ @Test
+ public void testParallelUpdates() {
+ final int numOfThreads = 3;
+ final int numOfTypesPerKind = 30;
+ final String enumTypePrefix = "testEnum-";
+ final String structTypePrefix = "testStruct-";
+ final String classificationPrefix = "testTag-";
+ final String entityTypePrefix = "testEntity-";
+
+ ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
+
+ final AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
+
+ // update typeRegistry simultaneously in multiple threads
+ for (int threadIdx = 0; threadIdx < numOfThreads; threadIdx++) {
+ executor.submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ for (int i = 0; i < numOfTypesPerKind; i++) {
+ addType(typeRegistry, new AtlasEnumDef(enumTypePrefix + i));
+ }
+
+ for (int i = 0; i < numOfTypesPerKind; i++) {
+ addType(typeRegistry, new AtlasStructDef(structTypePrefix + i));
+ }
+
+ for (int i = 0; i < numOfTypesPerKind; i++) {
+ addType(typeRegistry, new AtlasClassificationDef(classificationPrefix + i));
+ }
+
+ for (int i = 0; i < numOfTypesPerKind; i++) {
+ addType(typeRegistry, new AtlasEntityDef(entityTypePrefix + i));
+ }
+
+ return null;
+ }
+ });
+ }
+
+ executor.shutdown();
+
+ try {
+ boolean isCompleted = executor.awaitTermination(60, TimeUnit.SECONDS);
+
+ assertTrue(isCompleted, "threads did not complete updating types");
+ } catch (InterruptedException excp) {
+ // ignore?
+ }
+
+ // verify that all types added are present in the typeRegistry
+ for (int i = 0; i < numOfTypesPerKind; i++) {
+ String enumType = enumTypePrefix + i;
+ String structType = structTypePrefix + i;
+ String classificationType = classificationPrefix + i;
+ String entityType = entityTypePrefix + i;
+
+ assertNotNull(typeRegistry.getEnumDefByName(enumType), enumType + ": enum not found");
+ assertNotNull(typeRegistry.getStructDefByName(structType), structType + ": struct not found");
+ assertNotNull(typeRegistry.getClassificationDefByName(classificationType), classificationType + ": classification not found");
+ assertNotNull(typeRegistry.getEntityDefByName(entityType), entityType + ": entity not found");
+ }
+ }
+
+ private boolean addType(AtlasTypeRegistry typeRegistry, AtlasBaseTypeDef typeDef) {
+ boolean ret = false;
+ AtlasTransientTypeRegistry ttr = null;
+
+ try {
+ ttr = typeRegistry.lockTypeRegistryForUpdate();
+
+ ttr.addType(typeDef);
+
+ ret = true;
+ } catch (AtlasBaseException excp) {
+ // ignore
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, ret);
+ }
+
+ return ret;
+ }
+
private void validateSuperTypes(AtlasTypeRegistry typeRegistry, String typeName, Set<String> expectedSuperTypes) {
AtlasType type = null;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
index f7c2931..433d09c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
@@ -43,6 +43,7 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry;
+import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.apache.commons.collections.Transformer;
@@ -65,14 +66,15 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefGraphStore.class);
- private final AtlasTypeRegistry typeRegistry;
-
+ private final AtlasTypeRegistry typeRegistry;
private final Set<TypeDefChangeListener> typeDefChangeListeners;
+ private final int typeUpdateLockMaxWaitTimeSeconds;
protected AtlasTypeDefGraphStore(AtlasTypeRegistry typeRegistry,
Set<TypeDefChangeListener> typeDefChangeListeners) {
- this.typeRegistry = typeRegistry;
- this.typeDefChangeListeners = typeDefChangeListeners;
+ this.typeRegistry = typeRegistry;
+ this.typeDefChangeListeners = typeDefChangeListeners;
+ this.typeUpdateLockMaxWaitTimeSeconds = AtlasRepositoryConfiguration.getTypeUpdateLockMaxWaitTimeInSeconds();
}
protected abstract AtlasEnumDefStore getEnumDefStore(AtlasTypeRegistry typeRegistry);
@@ -85,16 +87,23 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
@Override
public void init() throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = null;
+ boolean commitUpdates = false;
- AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(ttr).getAll(),
- getStructDefStore(ttr).getAll(),
- getClassificationDefStore(ttr).getAll(),
- getEntityDefStore(ttr).getAll());
+ try {
+ ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds);
- ttr.addTypes(typesDef);
+ AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(ttr).getAll(),
+ getStructDefStore(ttr).getAll(),
+ getClassificationDefStore(ttr).getAll(),
+ getEntityDefStore(ttr).getAll());
- typeRegistry.commitTransientTypeRegistry(ttr);
+ ttr.addTypes(typesDef);
+
+ commitUpdates = true;
+ } finally {
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates);
+ }
bootstrapTypes();
}
@@ -102,7 +111,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
@Override
@GraphTransaction
public AtlasEnumDef createEnumDef(AtlasEnumDef enumDef) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.addType(enumDef);
@@ -110,22 +119,15 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid());
- updateTypeRegistryPostCommit(ttr);
-
return ret;
}
@Override
@GraphTransaction
public List<AtlasEnumDef> getAllEnumDefs() throws AtlasBaseException {
- List<AtlasEnumDef> ret = null;
-
Collection<AtlasEnumDef> enumDefs = typeRegistry.getAllEnumDefs();
- ret = CollectionUtils.isNotEmpty(enumDefs) ?
- new ArrayList<>(enumDefs) : Collections.<AtlasEnumDef>emptyList();
-
- return ret;
+ return CollectionUtils.isNotEmpty(enumDefs) ? new ArrayList<>(enumDefs) : Collections.<AtlasEnumDef>emptyList();
}
@Override
@@ -151,70 +153,53 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
@Override
@GraphTransaction
public AtlasEnumDef updateEnumDefByName(String name, AtlasEnumDef enumDef) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypeByName(name, enumDef);
- AtlasEnumDef ret = getEnumDefStore(ttr).updateByName(name, enumDef);
-
- updateTypeRegistryPostCommit(ttr);
-
- return ret;
+ return getEnumDefStore(ttr).updateByName(name, enumDef);
}
@Override
@GraphTransaction
public AtlasEnumDef updateEnumDefByGuid(String guid, AtlasEnumDef enumDef) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypeByGuid(guid, enumDef);
- AtlasEnumDef ret = getEnumDefStore(ttr).updateByGuid(guid, enumDef);
-
- updateTypeRegistryPostCommit(ttr);
-
- return ret;
+ return getEnumDefStore(ttr).updateByGuid(guid, enumDef);
}
@Override
@GraphTransaction
public void deleteEnumDefByName(String name) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
- AtlasEnumDef byName = typeRegistry.getEnumDefByName(name);
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.removeTypeByName(name);
getEnumDefStore(ttr).deleteByName(name);
-
- updateTypeRegistryPostCommit(ttr);
}
@Override
@GraphTransaction
public void deleteEnumDefByGuid(String guid) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
- AtlasEnumDef byGuid = typeRegistry.getEnumDefByGuid(guid);
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.removeTypeByGuid(guid);
getEnumDefStore(ttr).deleteByGuid(guid);
-
- updateTypeRegistryPostCommit(ttr);
}
@Override
@GraphTransaction
public AtlasEnumDefs searchEnumDefs(SearchFilter filter) throws AtlasBaseException {
- AtlasEnumDefs search = getEnumDefStore(typeRegistry).search(filter);
- return search;
+ return getEnumDefStore(typeRegistry).search(filter);
}
@Override
@GraphTransaction
public AtlasStructDef createStructDef(AtlasStructDef structDef) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.addType(structDef);
@@ -222,31 +207,26 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid());
- updateTypeRegistryPostCommit(ttr);
-
return ret;
}
@Override
@GraphTransaction
public List<AtlasStructDef> getAllStructDefs() throws AtlasBaseException {
- List<AtlasStructDef> ret = null;
-
Collection<AtlasStructDef> structDefs = typeRegistry.getAllStructDefs();
- ret = CollectionUtils.isNotEmpty(structDefs) ?
- new ArrayList<>(structDefs) : Collections.<AtlasStructDef>emptyList();
-
- return ret;
+ return CollectionUtils.isNotEmpty(structDefs) ? new ArrayList<>(structDefs) : Collections.<AtlasStructDef>emptyList();
}
@Override
@GraphTransaction
public AtlasStructDef getStructDefByName(String name) throws AtlasBaseException {
AtlasStructDef ret = typeRegistry.getStructDefByName(name);
+
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
}
+
return ret;
}
@@ -254,81 +234,65 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
@GraphTransaction
public AtlasStructDef getStructDefByGuid(String guid) throws AtlasBaseException {
AtlasStructDef ret = typeRegistry.getStructDefByGuid(guid);
+
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
}
+
return ret;
}
@Override
@GraphTransaction
public AtlasStructDef updateStructDefByName(String name, AtlasStructDef structDef) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypeByName(name, structDef);
- AtlasStructDef ret = getStructDefStore(ttr).updateByName(name, structDef);
-
- updateTypeRegistryPostCommit(ttr);
-
- return ret;
+ return getStructDefStore(ttr).updateByName(name, structDef);
}
@Override
@GraphTransaction
public AtlasStructDef updateStructDefByGuid(String guid, AtlasStructDef structDef) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypeByGuid(guid, structDef);
- AtlasStructDef ret = getStructDefStore(ttr).updateByGuid(guid, structDef);
-
- updateTypeRegistryPostCommit(ttr);
-
- return ret;
+ return getStructDefStore(ttr).updateByGuid(guid, structDef);
}
@Override
@GraphTransaction
public void deleteStructDefByName(String name) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
- AtlasStructDef byName = typeRegistry.getStructDefByName(name);
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.removeTypeByName(name);
getStructDefStore(ttr).deleteByName(name, null);
-
- updateTypeRegistryPostCommit(ttr);
}
@Override
@GraphTransaction
public void deleteStructDefByGuid(String guid) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
- AtlasStructDef byGuid = typeRegistry.getStructDefByGuid(guid);
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.removeTypeByGuid(guid);
getStructDefStore(ttr).deleteByGuid(guid, null);
-
- updateTypeRegistryPostCommit(ttr);
}
@Override
@GraphTransaction
public AtlasStructDefs searchStructDefs(SearchFilter filter) throws AtlasBaseException {
- AtlasStructDefs search = getStructDefStore(typeRegistry).search(filter);
-
- return search;
+ return getStructDefStore(typeRegistry).search(filter);
}
@Override
@GraphTransaction
public AtlasClassificationDef createClassificationDef(AtlasClassificationDef classificationDef)
throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.addType(classificationDef);
@@ -336,22 +300,16 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid());
- updateTypeRegistryPostCommit(ttr);
-
return ret;
}
@Override
@GraphTransaction
public List<AtlasClassificationDef> getAllClassificationDefs() throws AtlasBaseException {
- List<AtlasClassificationDef> ret = null;
-
Collection<AtlasClassificationDef> classificationDefs = typeRegistry.getAllClassificationDefs();
- ret = CollectionUtils.isNotEmpty(classificationDefs) ?
- new ArrayList<>(classificationDefs) : Collections.<AtlasClassificationDef>emptyList();
-
- return ret;
+ return CollectionUtils.isNotEmpty(classificationDefs) ? new ArrayList<>(classificationDefs)
+ : Collections.<AtlasClassificationDef>emptyList();
}
@Override
@@ -362,6 +320,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
}
+
return ret;
}
@@ -369,9 +328,11 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
@GraphTransaction
public AtlasClassificationDef getClassificationDefByGuid(String guid) throws AtlasBaseException {
AtlasClassificationDef ret = typeRegistry.getClassificationDefByGuid(guid);
+
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
}
+
return ret;
}
@@ -379,72 +340,54 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
@GraphTransaction
public AtlasClassificationDef updateClassificationDefByName(String name, AtlasClassificationDef classificationDef)
throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypeByName(name, classificationDef);
- AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByName(name, classificationDef);
-
- updateTypeRegistryPostCommit(ttr);
-
- return ret;
+ return getClassificationDefStore(ttr).updateByName(name, classificationDef);
}
@Override
@GraphTransaction
public AtlasClassificationDef updateClassificationDefByGuid(String guid, AtlasClassificationDef classificationDef)
throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypeByGuid(guid, classificationDef);
- AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByGuid(guid, classificationDef);
-
- updateTypeRegistryPostCommit(ttr);
-
- return ret;
+ return getClassificationDefStore(ttr).updateByGuid(guid, classificationDef);
}
@Override
@GraphTransaction
public void deleteClassificationDefByName(String name) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
- AtlasClassificationDef byName = typeRegistry.getClassificationDefByName(name);
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.removeTypeByName(name);
getClassificationDefStore(ttr).deleteByName(name, null);
-
- updateTypeRegistryPostCommit(ttr);
}
@Override
@GraphTransaction
public void deleteClassificationDefByGuid(String guid) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
- AtlasClassificationDef byGuid = typeRegistry.getClassificationDefByGuid(guid);
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.removeTypeByGuid(guid);
getClassificationDefStore(ttr).deleteByGuid(guid, null);
-
- updateTypeRegistryPostCommit(ttr);
}
@Override
@GraphTransaction
public AtlasClassificationDefs searchClassificationDefs(SearchFilter filter) throws AtlasBaseException {
- AtlasClassificationDefs search = getClassificationDefStore(typeRegistry).search(filter);
-
- return search;
+ return getClassificationDefStore(typeRegistry).search(filter);
}
@Override
@GraphTransaction
public AtlasEntityDef createEntityDef(AtlasEntityDef entityDef) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.addType(entityDef);
@@ -452,31 +395,26 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid());
- updateTypeRegistryPostCommit(ttr);
-
return ret;
}
@Override
@GraphTransaction
public List<AtlasEntityDef> getAllEntityDefs() throws AtlasBaseException {
- List<AtlasEntityDef> ret = null;
-
Collection<AtlasEntityDef> entityDefs = typeRegistry.getAllEntityDefs();
- ret = CollectionUtils.isNotEmpty(entityDefs) ?
- new ArrayList<>(entityDefs) : Collections.<AtlasEntityDef>emptyList();
-
- return ret;
+ return CollectionUtils.isNotEmpty(entityDefs) ? new ArrayList<>(entityDefs) : Collections.<AtlasEntityDef>emptyList();
}
@Override
@GraphTransaction
public AtlasEntityDef getEntityDefByName(String name) throws AtlasBaseException {
AtlasEntityDef ret = typeRegistry.getEntityDefByName(name);
+
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
}
+
return ret;
}
@@ -484,74 +422,58 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
@GraphTransaction
public AtlasEntityDef getEntityDefByGuid(String guid) throws AtlasBaseException {
AtlasEntityDef ret = typeRegistry.getEntityDefByGuid(guid);
+
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
}
+
return ret;
}
@Override
@GraphTransaction
public AtlasEntityDef updateEntityDefByName(String name, AtlasEntityDef entityDef) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypeByName(name, entityDef);
- AtlasEntityDef ret = getEntityDefStore(ttr).updateByName(name, entityDef);
-
- updateTypeRegistryPostCommit(ttr);
-
- return ret;
+ return getEntityDefStore(ttr).updateByName(name, entityDef);
}
@Override
@GraphTransaction
public AtlasEntityDef updateEntityDefByGuid(String guid, AtlasEntityDef entityDef) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypeByGuid(guid, entityDef);
- AtlasEntityDef ret = getEntityDefStore(ttr).updateByGuid(guid, entityDef);
-
- updateTypeRegistryPostCommit(ttr);
-
- return ret;
+ return getEntityDefStore(ttr).updateByGuid(guid, entityDef);
}
@Override
@GraphTransaction
public void deleteEntityDefByName(String name) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
- AtlasEntityDef byName = typeRegistry.getEntityDefByName(name);
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.removeTypeByName(name);
getEntityDefStore(ttr).deleteByName(name, null);
-
- updateTypeRegistryPostCommit(ttr);
}
@Override
@GraphTransaction
public void deleteEntityDefByGuid(String guid) throws AtlasBaseException {
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
-
- AtlasEntityDef byGuid = typeRegistry.getEntityDefByGuid(guid);
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.removeTypeByGuid(guid);
getEntityDefStore(ttr).deleteByGuid(guid, null);
-
- updateTypeRegistryPostCommit(ttr);
}
@Override
@GraphTransaction
public AtlasEntityDefs searchEntityDefs(SearchFilter filter) throws AtlasBaseException {
- AtlasEntityDefs search = getEntityDefStore(typeRegistry).search(filter);
-
- return search;
+ return getEntityDefStore(typeRegistry).search(filter);
}
@Override
@@ -567,7 +489,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasTypesDef ret = new AtlasTypesDef();
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.addTypes(typesDef);
@@ -644,8 +566,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
}
}
- updateTypeRegistryPostCommit(ttr);
-
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
@@ -670,7 +590,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasTypesDef ret = new AtlasTypesDef();
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypes(typesDef);
@@ -703,8 +623,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
}
}
- updateTypeRegistryPostCommit(ttr);
-
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
@@ -728,7 +646,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
CollectionUtils.size(typesDef.getEntityDefs()));
}
- AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
+ AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.addTypes(typesDef);
@@ -817,8 +735,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
}
}
- updateTypeRegistryPostCommit(ttr);
-
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
@@ -934,8 +850,12 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
storeInitializer.initializeStore(this, typeRegistry, typesDirName);
}
- private void updateTypeRegistryPostCommit(AtlasTransientTypeRegistry ttr) {
+ private AtlasTransientTypeRegistry lockTypeRegistryAndReleasePostCommit() throws AtlasBaseException {
+ AtlasTransientTypeRegistry ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds);
+
new TypeRegistryUpdateHook(ttr);
+
+ return ttr;
}
private class TypeRegistryUpdateHook extends GraphTransactionInterceptor.PostTransactionHook {
@@ -953,9 +873,9 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
LOG.debug("==> TypeRegistryUpdateHook.onComplete({})", isSuccess);
}
- if (isSuccess) {
- typeRegistry.commitTransientTypeRegistry(ttr);
+ typeRegistry.releaseTypeRegistryForUpdate(ttr, isSuccess);
+ if (isSuccess) {
notifyListeners(ttr);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
index 287ef09..88197ac 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java
@@ -68,7 +68,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
Set<TypeDefChangeListener> typeDefChangeListeners) {
super(typeRegistry, typeDefChangeListeners);
- LOG.info("==> AtlasTypeDefGraphStoreV1()");
+ LOG.debug("==> AtlasTypeDefGraphStoreV1()");
try {
init();
@@ -76,7 +76,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
LOG.error("failed to initialize types from graph store", excp);
}
- LOG.info("<== AtlasTypeDefGraphStoreV1()");
+ LOG.debug("<== AtlasTypeDefGraphStoreV1()");
}
@Override
@@ -101,11 +101,11 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
@Override
public void init() throws AtlasBaseException {
- LOG.info("==> AtlasTypeDefGraphStoreV1.init()");
+ LOG.debug("==> AtlasTypeDefGraphStoreV1.init()");
super.init();
- LOG.info("<== AtlasTypeDefGraphStoreV1.init()");
+ LOG.debug("<== AtlasTypeDefGraphStoreV1.init()");
}
AtlasGraph getAtlasGraph() { return atlasGraph; }
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4b8b9e22/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
index 6655085..71c7ff8 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
@@ -48,6 +48,10 @@ public class AtlasRepositoryConfiguration {
private static List<String> skippedOperations = null;
public static final String SEPARATOR = ":";
+ private static final String CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = "atlas.server.type.update.lock.max.wait.time.seconds";
+ private static final Integer DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = Integer.valueOf(15);
+ private static Integer typeUpdateLockMaxWaitTimeInSeconds = null;
+
@SuppressWarnings("unchecked")
public static Class<? extends TypeCache> getTypeCache() {
// Get the type cache implementation class from Atlas configuration.
@@ -155,4 +159,21 @@ public class AtlasRepositoryConfiguration {
skippedOperations = null;
}
+ public static int getTypeUpdateLockMaxWaitTimeInSeconds() {
+ Integer ret = typeUpdateLockMaxWaitTimeInSeconds;
+
+ if (ret == null) {
+ try {
+ Configuration config = ApplicationProperties.get();
+
+ ret = config.getInteger(CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS, DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS);
+
+ typeUpdateLockMaxWaitTimeInSeconds = ret;
+ } catch (AtlasException e) {
+ // ignore
+ }
+ }
+
+ return ret == null ? DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS : ret;
+ }
}