You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/26 15:14:13 UTC
[3/5] incubator-atlas git commit: ATLAS-1889: fix to handle
concurrent calls to update tags for an entity
ATLAS-1889: fix to handle concurrent calls to update tags for an entity
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bcb128af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bcb128af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bcb128af
Branch: refs/heads/feature-odf
Commit: bcb128af3283432a1da3ab4a524cd200f647f862
Parents: e0abdb3
Author: ashutoshm <am...@hortonworks.com>
Authored: Thu Jun 22 07:35:04 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Jun 22 10:20:14 2017 -0700
----------------------------------------------------------------------
.../atlas/GraphTransactionInterceptor.java | 120 ++++++++++
.../graph/GraphBackedMetadataRepository.java | 28 ++-
.../store/graph/v1/AtlasEntityStoreV1.java | 7 +
.../utils/ObjectUpdateSynchronizerTest.java | 218 +++++++++++++++++++
4 files changed, 364 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcb128af/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index 7d3bdf7..c6a4bbe 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -17,6 +17,7 @@
package org.apache.atlas;
+import com.google.common.annotations.VisibleForTesting;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.atlas.exception.AtlasBaseException;
@@ -29,12 +30,18 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
@Component
public class GraphTransactionInterceptor implements MethodInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class);
+ @VisibleForTesting
+ private static final ObjectUpdateSynchronizer OBJECT_UPDATE_SYNCHRONIZER = new ObjectUpdateSynchronizer();
private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>();
private final AtlasGraph graph;
@@ -82,9 +89,19 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
}
}
+
+ OBJECT_UPDATE_SYNCHRONIZER.releaseLockedObjects();
}
}
+ public static void lockObjectAndReleasePostCommit(final String guid) {
+ OBJECT_UPDATE_SYNCHRONIZER.lockObject(guid);
+ }
+
+ public static void lockObjectAndReleasePostCommit(final List<String> guids) {
+ OBJECT_UPDATE_SYNCHRONIZER.lockObject(guids);
+ }
+
boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
@@ -110,4 +127,107 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
public abstract void onComplete(boolean isSuccess);
}
+
+ private static class RefCountedReentrantLock extends ReentrantLock {
+ private int refCount;
+
+ public RefCountedReentrantLock() {
+ this.refCount = 0;
+ }
+
+ public int increment() {
+ return ++refCount;
+ }
+
+ public int decrement() {
+ return --refCount;
+ }
+
+ public int getRefCount() { return refCount; }
+ }
+
+
+ public static class ObjectUpdateSynchronizer {
+ private final Map<String, RefCountedReentrantLock> guidLockMap = new ConcurrentHashMap<>();
+ private final ThreadLocal<List<String>> lockedGuids = new ThreadLocal<List<String>>() {
+ @Override
+ protected List<String> initialValue() {
+ return new ArrayList<>();
+ }
+ };
+
+ public void lockObject(final List<String> guids) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> lockObject(): guids: {}", guids);
+ }
+
+ Collections.sort(guids);
+ for (String g : guids) {
+ lockObject(g);
+ }
+ }
+
+ private void lockObject(final String guid) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size());
+ }
+
+ ReentrantLock lock = getOrCreateObjectLock(guid);
+ lock.lock();
+
+ lockedGuids.get().add(guid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size());
+ }
+ }
+
+ public void releaseLockedObjects() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size());
+ }
+
+ for (String guid : lockedGuids.get()) {
+ releaseObjectLock(guid);
+ }
+
+ lockedGuids.get().clear();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size());
+ }
+ }
+
+ private RefCountedReentrantLock getOrCreateObjectLock(String guid) {
+ synchronized (guidLockMap) {
+ RefCountedReentrantLock ret = guidLockMap.get(guid);
+ if (ret == null) {
+ ret = new RefCountedReentrantLock();
+ guidLockMap.put(guid, ret);
+ }
+
+ ret.increment();
+ return ret;
+ }
+ }
+
+ private RefCountedReentrantLock releaseObjectLock(String guid) {
+ synchronized (guidLockMap) {
+ RefCountedReentrantLock lock = guidLockMap.get(guid);
+ if (lock != null && lock.isHeldByCurrentThread()) {
+ int refCount = lock.decrement();
+
+ if (refCount == 0) {
+ guidLockMap.remove(guid);
+ }
+
+ lock.unlock();
+ } else {
+ LOG.warn("releaseLockedObjects: {} Attempting to release a lock not held by current thread.", guid);
+ }
+
+ return lock;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcb128af/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index 5bec8fa..0f3b06b 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -21,6 +21,7 @@ package org.apache.atlas.repository.graph;
import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasException;
import org.apache.atlas.CreateUpdateEntitiesResult;
+import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.model.instance.GuidMapping;
@@ -49,7 +50,16 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.inject.Singleton;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* An implementation backed by a Graph database provided
@@ -303,6 +313,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
LOG.debug("Adding a new trait={} for entities={}", traitInstance.getTypeName(), entityGuids);
}
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuids);
for (String entityGuid : entityGuids) {
addTraitImpl(entityGuid, traitInstance);
}
@@ -321,12 +332,12 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
Preconditions.checkNotNull(guid, "guid cannot be null");
Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null");
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
addTraitImpl(guid, traitInstance);
}
private void addTraitImpl(String guid, ITypedStruct traitInstance) throws RepositoryException {
final String traitName = traitInstance.getTypeName();
-
if (LOG.isDebugEnabled()) {
LOG.debug("Adding a new trait={} for entity={}", traitName, guid);
}
@@ -365,9 +376,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public void deleteTrait(String guid, String traitNameToBeDeleted) throws TraitNotFoundException, EntityNotFoundException, RepositoryException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
- }
+ LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
AtlasVertex instanceVertex = graphHelper.getVertexForGUID(guid);
@@ -383,11 +393,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
AtlasEdge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
if(edge != null) {
deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true);
-
- // update the traits in entity once trait removal is successful
- traitNames.remove(traitNameToBeDeleted);
- updateTraits(instanceVertex, traitNames);
}
+
+ // update the traits in entity once trait removal is successful
+ traitNames.remove(traitNameToBeDeleted);
+ updateTraits(instanceVertex, traitNames);
} catch (Exception e) {
throw new RepositoryException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcb128af/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 75e9132..5ea4ff2 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
@@ -456,6 +457,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
LOG.debug("Adding classifications={} to entity={}", classifications, guid);
}
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
for (AtlasClassification classification : classifications) {
validateAndNormalize(classification);
}
@@ -484,6 +486,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
}
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
List<AtlasClassification> updatedClassifications = new ArrayList<>();
for (AtlasClassification newClassification : newClassifications) {
@@ -527,6 +530,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
LOG.debug("Adding classification={} to entities={}", classification, guids);
}
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids);
+
validateAndNormalize(classification);
List<AtlasClassification> classifications = Collections.singletonList(classification);
@@ -557,6 +562,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid);
}
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
+
entityGraphMapper.deleteClassifications(guid, classificationNames);
// notify listeners on classification deletion
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcb128af/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java b/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java
new file mode 100644
index 0000000..03ebae4
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.utils;
+
+import org.apache.atlas.GraphTransactionInterceptor;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.springframework.util.CollectionUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+public class ObjectUpdateSynchronizerTest {
+ private static final GraphTransactionInterceptor.ObjectUpdateSynchronizer objectUpdateSynchronizer = new GraphTransactionInterceptor.ObjectUpdateSynchronizer();
+
+ private final List<Integer> outputList = new ArrayList<>();
+ private final int MAX_COUNT = 10;
+
+ class CounterThread extends Thread {
+ String ids[];
+ public CounterThread(String id) {
+ this.ids = new String[1];
+ this.ids[0] = id;
+ }
+
+ public void setIds(String... ids) {
+ this.ids = ids;
+ }
+
+ public void run() {
+ objectUpdateSynchronizer.lockObject(CollectionUtils.arrayToList(ids));
+ for (int i = 0; i < MAX_COUNT; i++) {
+ outputList.add(i);
+ RandomStringUtils.randomAlphabetic(20);
+ }
+
+ objectUpdateSynchronizer.releaseLockedObjects();
+ }
+ }
+
+ @BeforeMethod
+ public void clearOutputList() {
+ outputList.clear();
+ }
+
+ @Test
+ public void singleThreadRun() throws InterruptedException {
+ verifyMultipleThreadRun(1);
+ }
+
+ @Test
+ public void twoThreadsAccessingDifferntGuids_DoNotSerialize() throws InterruptedException {
+ CounterThread th[] = getCounterThreads(false, 2);
+
+ startCounterThreads(th);
+ waitForThreadsToEnd(th);
+ assertArrayNotEquals(populateExpectedArrayOutput(2));
+ }
+
+ @Test
+ public void twoThreadsAccessingSameGuid_Serialize() throws InterruptedException {
+ verifyMultipleThreadRun(2);
+ }
+
+ @Test
+ public void severalThreadsAccessingSameGuid_Serialize() throws InterruptedException {
+ verifyMultipleThreadRun(10);
+ }
+
+ @Test
+ public void severalThreadsSequentialAccessingListOfGuids() throws InterruptedException {
+ CounterThread th[] = getCounterThreads(false, 10);
+ int i = 0;
+ th[i++].setIds("1", "2", "3", "4", "5");
+ th[i++].setIds("1", "2", "3", "4");
+ th[i++].setIds("1", "2", "3");
+ th[i++].setIds("1", "2");
+ th[i++].setIds("1");
+ th[i++].setIds("1", "2");
+ th[i++].setIds("1", "2", "3");
+ th[i++].setIds("1", "2", "3", "4");
+ th[i++].setIds("1", "2", "3", "4", "5");
+ th[i++].setIds("1");
+
+ startCounterThreads(th);
+ waitForThreadsToEnd(th);
+ assertArrayEquals(populateExpectedArrayOutput(th.length));
+ }
+
+ @Test
+ public void severalThreadsNonSequentialAccessingListOfGuids() throws InterruptedException {
+ CounterThread th[] = getCounterThreads(false, 5);
+ int i = 0;
+ th[i++].setIds("2", "1", "3", "4", "5");
+ th[i++].setIds("3", "2", "4", "1");
+ th[i++].setIds("2", "3", "1");
+ th[i++].setIds("1", "2");
+ th[i++].setIds("1");
+
+ startCounterThreads(th);
+ waitForThreadsToEnd(th);
+ assertArrayEquals(populateExpectedArrayOutput(th.length));
+ }
+
+ @Test
+ public void severalThreadsAccessingOverlappingListOfGuids() throws InterruptedException {
+ CounterThread th[] = getCounterThreads(false, 5);
+ int i = 0;
+ th[i++].setIds("1", "2", "3", "4", "5");
+ th[i++].setIds("3", "4", "5", "6");
+ th[i++].setIds("5", "6", "7");
+ th[i++].setIds("7", "8");
+ th[i++].setIds("8");
+
+ startCounterThreads(th);
+ waitForThreadsToEnd(th);
+ assertArrayNotEquals(populateExpectedArrayOutput(th.length));
+ }
+
+
+ @Test
+ public void severalThreadsAccessingOverlappingListOfGuids2() throws InterruptedException {
+ CounterThread th[] = getCounterThreads(false, 3);
+ int i = 0;
+ th[i++].setIds("1", "2", "3", "4", "5");
+ th[i++].setIds("6", "7", "8", "9");
+ th[i++].setIds("4", "5", "6");
+
+ startCounterThreads(th);
+ waitForThreadsToEnd(th);
+ assertArrayNotEquals(populateExpectedArrayOutput(th.length));
+ }
+
+ @Test
+ public void severalThreadsAccessingOverlappingListOfGuidsEnsuringSerialOutput() throws InterruptedException {
+ CounterThread th[] = getCounterThreads(false, 5);
+ int i = 0;
+ th[i++].setIds("1", "2", "3", "4", "7");
+ th[i++].setIds("3", "4", "5", "7");
+ th[i++].setIds("5", "6", "7");
+ th[i++].setIds("7", "8");
+ th[i++].setIds("7");
+
+ startCounterThreads(th);
+ waitForThreadsToEnd(th);
+ assertArrayEquals(populateExpectedArrayOutput(th.length));
+ }
+
+ private void verifyMultipleThreadRun(int limit) throws InterruptedException {
+ CounterThread[] th = getCounterThreads(limit);
+ startCounterThreads(th);
+ waitForThreadsToEnd(th);
+ assertArrayEquals(populateExpectedArrayOutput(limit));
+ }
+
+ private void startCounterThreads(CounterThread[] th) {
+ for (int i = 0; i < th.length; i++) {
+ th[i].start();
+ }
+ }
+ private CounterThread[] getCounterThreads(int limit) {
+ return getCounterThreads(true, limit);
+ }
+
+ private CounterThread[] getCounterThreads(boolean sameId, int limit) {
+ CounterThread th[] = new CounterThread[limit];
+ for (Integer i = 0; i < limit; i++) {
+ th[i] = new CounterThread(sameId ? "1" : i.toString());
+ }
+ return th;
+ }
+
+
+ private void assertArrayEquals(List<Integer> expected) {
+ assertEquals(outputList.toArray(), expected.toArray());
+ }
+
+ private void assertArrayNotEquals(List<Integer> expected) {
+ assertFalse(ArrayUtils.isEquals(outputList.toArray(), expected));
+ }
+
+ private void waitForThreadsToEnd(CounterThread... threads) throws InterruptedException {
+ for (Thread t : threads) {
+ t.join();
+ }
+ }
+
+ private List<Integer> populateExpectedArrayOutput(int limit) {
+ List<Integer> list = new ArrayList<>();
+ for (int i = 0; i < limit*MAX_COUNT; i+=MAX_COUNT) {
+ for (int j = 0; j < MAX_COUNT; j++) {
+ list.add(j);
+ }
+ }
+
+ return list;
+ }
+}