You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/09/07 10:35:39 UTC

falcon git commit: FALCON-1410 Entity submit fails when multiple threads try submitting same definition. Contributed by Sandeep Samudrala.

Repository: falcon
Updated Branches:
  refs/heads/master 3ec8d9534 -> bd0028458


FALCON-1410 Entity submit fails when multiple threads try submitting same definition. Contributed by Sandeep Samudrala.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bd002845
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bd002845
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bd002845

Branch: refs/heads/master
Commit: bd002845867f886c75b773daeea5952d06fdef76
Parents: 3ec8d95
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Sep 7 13:32:17 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Sep 7 13:32:17 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../falcon/resource/AbstractEntityManager.java  |  44 +++++---
 .../falcon/resource/EntityManagerJerseyIT.java  | 102 +++++++++++++++++--
 .../org/apache/falcon/resource/TestContext.java |   7 ++
 4 files changed, 133 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8ac2cd1..88d0f64 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1410 Entity submit fails when multiple threads try submitting same definition(Sandeep Samudrala via Ajay Yadava)
+
     FALCON-1429 Fix Falcon monitoring, alert, audit and monitoring plugins by fixing aspectj handling(Venkat Ranganathan via Ajay Yadava)
 
     FALCON-1416 Add ACL (if missing) during touch(Narayan Periwal via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 03efa20..f8f36b2 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -244,6 +244,7 @@ public abstract class AbstractEntityManager {
      */
     public APIResult delete(HttpServletRequest request, String type, String entity, String colo) {
         checkColo(colo);
+        List<Entity> tokenList = null;
         try {
             EntityType entityType = EntityType.getEnum(type);
             String removedFromEngine = "";
@@ -251,6 +252,7 @@ public abstract class AbstractEntityManager {
                 Entity entityObj = EntityUtil.getEntity(type, entity);
 
                 canRemove(entityObj);
+                tokenList = obtainEntityLocks(entityObj, "delete");
                 if (entityType.isSchedulable() && !DeploymentUtil.isPrism()) {
                     getWorkflowEngine().delete(entityObj);
                     removedFromEngine = "(KILLED in ENGINE)";
@@ -267,6 +269,8 @@ public abstract class AbstractEntityManager {
         } catch (Throwable e) {
             LOG.error("Unable to reach workflow engine for deletion or deletion failed", e);
             throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+        } finally {
+            releaseEntityLocks(entity, tokenList);
         }
     }
 
@@ -285,7 +289,7 @@ public abstract class AbstractEntityManager {
             validateUpdate(oldEntity, newEntity);
             configStore.initiateUpdate(newEntity);
 
-            tokenList = obtainUpdateEntityLocks(oldEntity);
+            tokenList = obtainEntityLocks(oldEntity, "update");
 
             StringBuilder result = new StringBuilder("Updated successfully");
             //Update in workflow engine
@@ -311,11 +315,11 @@ public abstract class AbstractEntityManager {
             throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
         } finally {
             ConfigurationStore.get().cleanupUpdateInit();
-            releaseUpdateEntityLocks(entityName, tokenList);
+            releaseEntityLocks(entityName, tokenList);
         }
     }
 
-    private List<Entity> obtainUpdateEntityLocks(Entity entity)
+    private List<Entity> obtainEntityLocks(Entity entity, String command)
         throws FalconException {
         List<Entity> tokenList = new ArrayList<Entity>();
 
@@ -323,25 +327,28 @@ public abstract class AbstractEntityManager {
         if (memoryLocks.acquireLock(entity)) {
             tokenList.add(entity);
         } else {
-            throw new FalconException("Looks like an update command is already issued for " + entity.toShortString());
+            throw new FalconException(command + " command is already issued for " + entity.toShortString());
         }
 
-        //now obtain locks for all dependent entities.
+        //now obtain locks for all dependent entities if any.
         Set<Entity> affectedEntities = EntityGraph.get().getDependents(entity);
-        for (Entity e : affectedEntities) {
-            if (memoryLocks.acquireLock(e)) {
-                tokenList.add(e);
-            } else {
-                LOG.error("Error while trying to acquire lock for {}. Releasing already obtained locks",
-                        e.toShortString());
-                throw new FalconException("There are multiple update commands running for dependent entity "
-                        + e.toShortString());
+        if (affectedEntities != null) {
+            for (Entity e : affectedEntities) {
+                if (memoryLocks.acquireLock(e)) {
+                    tokenList.add(e);
+                    LOG.debug("{} on entity {} has acquired lock on {}", command, entity, e);
+                } else {
+                    LOG.error("Error while trying to acquire lock for {}. Releasing already obtained locks",
+                            e.toShortString());
+                    throw new FalconException("There are multiple update commands running for dependent entity "
+                            + e.toShortString());
+                }
             }
         }
         return tokenList;
     }
 
-    private void releaseUpdateEntityLocks(String entityName, List<Entity> tokenList) {
+    private void releaseEntityLocks(String entityName, List<Entity> tokenList) {
         if (tokenList != null && !tokenList.isEmpty()) {
             for (Entity entity : tokenList) {
                 memoryLocks.releaseLock(entity);
@@ -391,14 +398,21 @@ public abstract class AbstractEntityManager {
         }
     }
 
-    protected synchronized Entity submitInternal(HttpServletRequest request, String type)
+    protected Entity submitInternal(HttpServletRequest request, String type)
         throws IOException, FalconException {
 
         EntityType entityType = EntityType.getEnum(type);
         Entity entity = deserializeEntity(request, entityType);
+        List<Entity> tokenList = null;
         // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
         decorateEntityWithACL(entity);
 
+        try {
+            tokenList = obtainEntityLocks(entity, "submit");
+        }finally {
+            ConfigurationStore.get().cleanupUpdateInit();
+            releaseEntityLocks(entity.getName(), tokenList);
+        }
         Entity existingEntity = configStore.get(entityType, entity.getName());
         if (existingEntity != null) {
             if (EntityUtil.equals(existingEntity, entity)) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index f0cee61..bcd3bd5 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -574,6 +574,55 @@ public class EntityManagerJerseyIT {
         context.assertSuccessful(response);
     }
 
+    @Test
+    public void testDuplicateSubmitCommands() throws Exception {
+        TestContext context = newContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        ExecutorService service = Executors.newSingleThreadExecutor();
+        ExecutorService duplicateService = Executors.newSingleThreadExecutor();
+
+        Future<ClientResponse> future = service.submit(new SubmitCommand(context, overlay));
+        Future<ClientResponse> duplicateFuture = duplicateService.submit(new SubmitCommand(context, overlay));
+
+        ClientResponse response = future.get();
+        ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get();
+
+        // since there are duplicate threads for submits, there is no guarantee which request will succeed.
+        testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse);
+    }
+
+    @Test
+    public void testDuplicateDeleteCommands() throws Exception {
+        TestContext context = newContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+
+        ExecutorService service = Executors.newSingleThreadExecutor();
+        ExecutorService duplicateService = Executors.newSingleThreadExecutor();
+
+        Future<ClientResponse> future = service.submit(new DeleteCommand(context, overlay.get("cluster"), "cluster"));
+        Future<ClientResponse> duplicateFuture = duplicateService.submit(new DeleteCommand(context,
+                overlay.get("cluster"), "cluster"));
+
+        ClientResponse response = future.get();
+        ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get();
+
+        // since there are duplicate threads for deletion, there is no guarantee which request will succeed.
+        testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse);
+    }
+
+    private void testDuplicateCommandsResponse(TestContext context, ClientResponse response,
+                                               ClientResponse duplicateSubmitThreadResponse) {
+        if (response.getStatus() == Response.Status.OK.getStatusCode()) {
+            context.assertSuccessful(response);
+            context.assertFailure(duplicateSubmitThreadResponse);
+        } else {
+            context.assertFailure(response);
+            context.assertSuccessful(duplicateSubmitThreadResponse);
+        }
+    }
+
     public void testProcesssScheduleAndDelete() throws Exception {
         TestContext context = newContext();
         ClientResponse clientResponse;
@@ -886,13 +935,7 @@ public class EntityManagerJerseyIT {
         ClientResponse duplicateUpdateThreadResponse = future.get();
 
         // since there are duplicate threads for updates, there is no guarantee which request will succeed
-        if (response.getStatus() == Response.Status.OK.getStatusCode()) {
-            context.assertSuccessful(response);
-            context.assertFailure(duplicateUpdateThreadResponse);
-        } else {
-            context.assertFailure(response);
-            context.assertSuccessful(duplicateUpdateThreadResponse);
-        }
+        testDuplicateCommandsResponse(context, response, duplicateUpdateThreadResponse);
 
     }
 
@@ -932,4 +975,49 @@ public class EntityManagerJerseyIT {
             return update(context, process, endTime, false);
         }
     }
+
+    class SubmitCommand implements Callable<ClientResponse> {
+        private Map<String, String> overlay;
+        private TestContext context;
+
+        public TestContext getContext() {
+            return context;
+        }
+
+        public Map<String, String> getOverlay() {
+            return overlay;
+        }
+
+        public SubmitCommand(TestContext context, Map<String, String> overlay) {
+            this.context = context;
+            this.overlay = overlay;
+        }
+
+        @Override
+        public ClientResponse call() throws Exception {
+            return context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        }
+    }
+
+    class DeleteCommand implements Callable<ClientResponse> {
+        private TestContext context;
+        private String entityName;
+        private String entityType;
+
+        public TestContext getContext() {
+            return context;
+        }
+
+        public DeleteCommand(TestContext context, String entityName, String entityType) {
+            this.context = context;
+            this.entityName = entityName;
+            this.entityType = entityType;
+        }
+
+        @Override
+        public ClientResponse call() throws Exception {
+            return context.deleteFromFalcon(entityName, entityType);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 4a25b88..54671fb 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -384,6 +384,13 @@ public class TestContext {
                 .post(ClientResponse.class, rawlogStream);
     }
 
+    public ClientResponse deleteFromFalcon(String entityName, String entityType) throws IOException{
+        return this.service.path("api/entities/delete/" + entityType + "/" + entityName.toLowerCase())
+                .header("Cookie", getAuthenticationToken())
+                .accept(MediaType.TEXT_XML)
+                .delete(ClientResponse.class);
+    }
+
     public void assertStatus(ClientResponse clientResponse, APIResult.Status status) {
         String response = clientResponse.getEntity(String.class);
         try {