You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/26 15:14:13 UTC

[3/5] incubator-atlas git commit: ATLAS-1889: fix to handle concurrent calls to update tags for an entity

ATLAS-1889: fix to handle concurrent calls to update tags for an entity

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bcb128af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bcb128af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bcb128af

Branch: refs/heads/feature-odf
Commit: bcb128af3283432a1da3ab4a524cd200f647f862
Parents: e0abdb3
Author: ashutoshm <am...@hortonworks.com>
Authored: Thu Jun 22 07:35:04 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Jun 22 10:20:14 2017 -0700

----------------------------------------------------------------------
 .../atlas/GraphTransactionInterceptor.java      | 120 ++++++++++
 .../graph/GraphBackedMetadataRepository.java    |  28 ++-
 .../store/graph/v1/AtlasEntityStoreV1.java      |   7 +
 .../utils/ObjectUpdateSynchronizerTest.java     | 218 +++++++++++++++++++
 4 files changed, 364 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcb128af/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index 7d3bdf7..c6a4bbe 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -17,6 +17,7 @@
 
 package org.apache.atlas;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.aopalliance.intercept.MethodInterceptor;
 import org.aopalliance.intercept.MethodInvocation;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -29,12 +30,18 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 
 @Component
 public class GraphTransactionInterceptor implements MethodInterceptor {
     private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class);
 
+    @VisibleForTesting
+    private static final ObjectUpdateSynchronizer OBJECT_UPDATE_SYNCHRONIZER = new ObjectUpdateSynchronizer();
     private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>();
 
     private final AtlasGraph graph;
@@ -82,9 +89,19 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
                     }
                 }
             }
+
+            OBJECT_UPDATE_SYNCHRONIZER.releaseLockedObjects();
         }
     }
 
+    public static void lockObjectAndReleasePostCommit(final String guid) {
+        OBJECT_UPDATE_SYNCHRONIZER.lockObject(guid);
+    }
+
+    public static void lockObjectAndReleasePostCommit(final List<String> guids) {
+        OBJECT_UPDATE_SYNCHRONIZER.lockObject(guids);
+    }
+
     boolean logException(Throwable t) {
         if (t instanceof AtlasBaseException) {
             Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
@@ -110,4 +127,107 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
 
         public abstract void onComplete(boolean isSuccess);
     }
+
+    private static class RefCountedReentrantLock extends ReentrantLock {
+        private int refCount;
+
+        public RefCountedReentrantLock() {
+            this.refCount = 0;
+        }
+
+        public int increment() {
+            return ++refCount;
+        }
+
+        public int decrement() {
+            return --refCount;
+        }
+
+        public int getRefCount() { return refCount; }
+    }
+
+
+    public static class ObjectUpdateSynchronizer {
+        private final Map<String, RefCountedReentrantLock> guidLockMap = new ConcurrentHashMap<>();
+        private final ThreadLocal<List<String>>  lockedGuids = new ThreadLocal<List<String>>() {
+            @Override
+            protected List<String> initialValue() {
+                return new ArrayList<>();
+            }
+        };
+
+        public void lockObject(final List<String> guids) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("==> lockObject(): guids: {}", guids);
+            }
+
+            Collections.sort(guids);
+            for (String g : guids) {
+                lockObject(g);
+            }
+        }
+
+        private void lockObject(final String guid) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("==> lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size());
+            }
+
+            ReentrantLock lock = getOrCreateObjectLock(guid);
+            lock.lock();
+
+            lockedGuids.get().add(guid);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("<== lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size());
+            }
+        }
+
+        public void releaseLockedObjects() {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("==> releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size());
+            }
+
+            for (String guid : lockedGuids.get()) {
+                releaseObjectLock(guid);
+            }
+
+            lockedGuids.get().clear();
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("<== releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size());
+            }
+        }
+
+        private RefCountedReentrantLock getOrCreateObjectLock(String guid) {
+            synchronized (guidLockMap) {
+                RefCountedReentrantLock ret = guidLockMap.get(guid);
+                if (ret == null) {
+                    ret = new RefCountedReentrantLock();
+                    guidLockMap.put(guid, ret);
+                }
+
+                ret.increment();
+                return ret;
+            }
+        }
+
+        private RefCountedReentrantLock releaseObjectLock(String guid) {
+            synchronized (guidLockMap) {
+                RefCountedReentrantLock lock = guidLockMap.get(guid);
+                if (lock != null && lock.isHeldByCurrentThread()) {
+                    int refCount = lock.decrement();
+
+                    if (refCount == 0) {
+                        guidLockMap.remove(guid);
+                    }
+
+                    lock.unlock();
+                } else {
+                    LOG.warn("releaseLockedObjects: {} Attempting to release a lock not held by current thread.", guid);
+                }
+
+                return lock;
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcb128af/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index 5bec8fa..0f3b06b 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -21,6 +21,7 @@ package org.apache.atlas.repository.graph;
 import com.google.common.base.Preconditions;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.CreateUpdateEntitiesResult;
+import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.model.instance.GuidMapping;
@@ -49,7 +50,16 @@ import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * An implementation backed by a Graph database provided
@@ -303,6 +313,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
             LOG.debug("Adding a new trait={} for entities={}", traitInstance.getTypeName(), entityGuids);
         }
 
+        GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuids);
         for (String entityGuid : entityGuids) {
             addTraitImpl(entityGuid, traitInstance);
         }
@@ -321,12 +332,12 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
         Preconditions.checkNotNull(guid, "guid cannot be null");
         Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null");
 
+        GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
         addTraitImpl(guid, traitInstance);
     }
 
     private void addTraitImpl(String guid, ITypedStruct traitInstance) throws RepositoryException {
         final String traitName = traitInstance.getTypeName();
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("Adding a new trait={} for entity={}", traitName, guid);
         }
@@ -365,9 +376,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
     @Override
     @GraphTransaction
     public void deleteTrait(String guid, String traitNameToBeDeleted) throws TraitNotFoundException, EntityNotFoundException, RepositoryException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
-        }
+        LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
+        GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
 
         AtlasVertex instanceVertex = graphHelper.getVertexForGUID(guid);
 
@@ -383,11 +393,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
             AtlasEdge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
             if(edge != null) {
                 deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true);
-
-                // update the traits in entity once trait removal is successful
-                traitNames.remove(traitNameToBeDeleted);
-                updateTraits(instanceVertex, traitNames);
             }
+
+            // update the traits in entity once trait removal is successful
+            traitNames.remove(traitNameToBeDeleted);
+            updateTraits(instanceVertex, traitNames);
         } catch (Exception e) {
             throw new RepositoryException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcb128af/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 75e9132..5ea4ff2 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v1;
 
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -456,6 +457,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
             LOG.debug("Adding classifications={} to entity={}", classifications, guid);
         }
 
+        GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
         for (AtlasClassification classification : classifications) {
             validateAndNormalize(classification);
         }
@@ -484,6 +486,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
         }
 
+        GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
         List<AtlasClassification> updatedClassifications = new ArrayList<>();
 
         for (AtlasClassification newClassification : newClassifications) {
@@ -527,6 +530,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
             LOG.debug("Adding classification={} to entities={}", classification, guids);
         }
 
+        GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids);
+
         validateAndNormalize(classification);
 
         List<AtlasClassification> classifications = Collections.singletonList(classification);
@@ -557,6 +562,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
             LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid);
         }
 
+        GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
+
         entityGraphMapper.deleteClassifications(guid, classificationNames);
 
         // notify listeners on classification deletion

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bcb128af/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java b/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java
new file mode 100644
index 0000000..03ebae4
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.utils;
+
+import org.apache.atlas.GraphTransactionInterceptor;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.springframework.util.CollectionUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+public class ObjectUpdateSynchronizerTest {
+    private static final GraphTransactionInterceptor.ObjectUpdateSynchronizer objectUpdateSynchronizer = new GraphTransactionInterceptor.ObjectUpdateSynchronizer();
+
+    private final List<Integer> outputList = new ArrayList<>();
+    private final int MAX_COUNT = 10;
+
+    class CounterThread extends Thread {
+        String ids[];
+        public CounterThread(String id) {
+            this.ids = new String[1];
+            this.ids[0] = id;
+        }
+
+        public void setIds(String... ids) {
+            this.ids = ids;
+        }
+
+        public void run() {
+            objectUpdateSynchronizer.lockObject(CollectionUtils.arrayToList(ids));
+            for (int i = 0; i < MAX_COUNT; i++) {
+                outputList.add(i);
+                RandomStringUtils.randomAlphabetic(20);
+            }
+
+            objectUpdateSynchronizer.releaseLockedObjects();
+        }
+    }
+
+    @BeforeMethod
+    public void clearOutputList() {
+        outputList.clear();
+    }
+
+    @Test
+    public void singleThreadRun() throws InterruptedException {
+        verifyMultipleThreadRun(1);
+    }
+
+    @Test
+    public void twoThreadsAccessingDifferntGuids_DoNotSerialize() throws InterruptedException {
+        CounterThread th[] = getCounterThreads(false, 2);
+
+        startCounterThreads(th);
+        waitForThreadsToEnd(th);
+        assertArrayNotEquals(populateExpectedArrayOutput(2));
+    }
+
+    @Test
+    public void twoThreadsAccessingSameGuid_Serialize() throws InterruptedException {
+        verifyMultipleThreadRun(2);
+    }
+
+    @Test
+    public void severalThreadsAccessingSameGuid_Serialize() throws InterruptedException {
+        verifyMultipleThreadRun(10);
+    }
+
+    @Test
+    public void severalThreadsSequentialAccessingListOfGuids() throws InterruptedException {
+        CounterThread th[] = getCounterThreads(false, 10);
+        int i = 0;
+        th[i++].setIds("1", "2", "3", "4", "5");
+        th[i++].setIds("1", "2", "3", "4");
+        th[i++].setIds("1", "2", "3");
+        th[i++].setIds("1", "2");
+        th[i++].setIds("1");
+        th[i++].setIds("1", "2");
+        th[i++].setIds("1", "2", "3");
+        th[i++].setIds("1", "2", "3", "4");
+        th[i++].setIds("1", "2", "3", "4", "5");
+        th[i++].setIds("1");
+
+        startCounterThreads(th);
+        waitForThreadsToEnd(th);
+        assertArrayEquals(populateExpectedArrayOutput(th.length));
+    }
+
+    @Test
+    public void severalThreadsNonSequentialAccessingListOfGuids() throws InterruptedException {
+        CounterThread th[] = getCounterThreads(false, 5);
+        int i = 0;
+        th[i++].setIds("2", "1", "3", "4", "5");
+        th[i++].setIds("3", "2", "4", "1");
+        th[i++].setIds("2", "3", "1");
+        th[i++].setIds("1", "2");
+        th[i++].setIds("1");
+
+        startCounterThreads(th);
+        waitForThreadsToEnd(th);
+        assertArrayEquals(populateExpectedArrayOutput(th.length));
+    }
+
+    @Test
+    public void severalThreadsAccessingOverlappingListOfGuids() throws InterruptedException {
+        CounterThread th[] = getCounterThreads(false, 5);
+        int i = 0;
+        th[i++].setIds("1", "2", "3", "4", "5");
+        th[i++].setIds("3", "4", "5", "6");
+        th[i++].setIds("5", "6", "7");
+        th[i++].setIds("7", "8");
+        th[i++].setIds("8");
+
+        startCounterThreads(th);
+        waitForThreadsToEnd(th);
+        assertArrayNotEquals(populateExpectedArrayOutput(th.length));
+    }
+
+
+    @Test
+    public void severalThreadsAccessingOverlappingListOfGuids2() throws InterruptedException {
+        CounterThread th[] = getCounterThreads(false, 3);
+        int i = 0;
+        th[i++].setIds("1", "2", "3", "4", "5");
+        th[i++].setIds("6", "7", "8", "9");
+        th[i++].setIds("4", "5", "6");
+
+        startCounterThreads(th);
+        waitForThreadsToEnd(th);
+        assertArrayNotEquals(populateExpectedArrayOutput(th.length));
+    }
+
+    @Test
+    public void severalThreadsAccessingOverlappingListOfGuidsEnsuringSerialOutput() throws InterruptedException {
+        CounterThread th[] = getCounterThreads(false, 5);
+        int i = 0;
+        th[i++].setIds("1", "2", "3", "4", "7");
+        th[i++].setIds("3", "4", "5", "7");
+        th[i++].setIds("5", "6", "7");
+        th[i++].setIds("7", "8");
+        th[i++].setIds("7");
+
+        startCounterThreads(th);
+        waitForThreadsToEnd(th);
+        assertArrayEquals(populateExpectedArrayOutput(th.length));
+    }
+
+    private void verifyMultipleThreadRun(int limit) throws InterruptedException {
+        CounterThread[] th = getCounterThreads(limit);
+        startCounterThreads(th);
+        waitForThreadsToEnd(th);
+        assertArrayEquals(populateExpectedArrayOutput(limit));
+    }
+
+    private void startCounterThreads(CounterThread[] th) {
+        for (int i = 0; i < th.length; i++) {
+            th[i].start();
+        }
+    }
+    private CounterThread[] getCounterThreads(int limit) {
+        return getCounterThreads(true, limit);
+    }
+
+    private CounterThread[] getCounterThreads(boolean sameId, int limit) {
+        CounterThread th[] = new CounterThread[limit];
+        for (Integer i = 0; i < limit; i++) {
+            th[i] = new CounterThread(sameId ? "1" : i.toString());
+        }
+        return th;
+    }
+
+
+    private void assertArrayEquals(List<Integer> expected) {
+        assertEquals(outputList.toArray(), expected.toArray());
+    }
+
+    private void assertArrayNotEquals(List<Integer> expected) {
+        assertFalse(ArrayUtils.isEquals(outputList.toArray(), expected));
+    }
+
+    private void waitForThreadsToEnd(CounterThread... threads) throws InterruptedException {
+        for (Thread t : threads) {
+            t.join();
+        }
+    }
+
+    private List<Integer> populateExpectedArrayOutput(int limit) {
+        List<Integer> list = new ArrayList<>();
+        for (int i = 0; i < limit*MAX_COUNT; i+=MAX_COUNT) {
+            for (int j = 0; j < MAX_COUNT; j++) {
+                list.add(j);
+            }
+        }
+
+        return list;
+    }
+}