You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/09/07 10:35:39 UTC
falcon git commit: FALCON-1410 Entity submit fails when multiple
threads try submitting same definition. Contributed by Sandeep Samudrala.
Repository: falcon
Updated Branches:
refs/heads/master 3ec8d9534 -> bd0028458
FALCON-1410 Entity submit fails when multiple threads try submitting same definition. Contributed by Sandeep Samudrala.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bd002845
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bd002845
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bd002845
Branch: refs/heads/master
Commit: bd002845867f886c75b773daeea5952d06fdef76
Parents: 3ec8d95
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Sep 7 13:32:17 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Sep 7 13:32:17 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../falcon/resource/AbstractEntityManager.java | 44 +++++---
.../falcon/resource/EntityManagerJerseyIT.java | 102 +++++++++++++++++--
.../org/apache/falcon/resource/TestContext.java | 7 ++
4 files changed, 133 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8ac2cd1..88d0f64 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,8 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-1410 Entity submit fails when multiple threads try submitting same definition(Sandeep Samudrala via Ajay Yadava)
+
FALCON-1429 Fix Falcon monitoring, alert, audit and monitoring plugins by fixing aspectj handling(Venkat Ranganathan via Ajay Yadava)
FALCON-1416 Add ACL (if missing) during touch(Narayan Periwal via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 03efa20..f8f36b2 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -244,6 +244,7 @@ public abstract class AbstractEntityManager {
*/
public APIResult delete(HttpServletRequest request, String type, String entity, String colo) {
checkColo(colo);
+ List<Entity> tokenList = null;
try {
EntityType entityType = EntityType.getEnum(type);
String removedFromEngine = "";
@@ -251,6 +252,7 @@ public abstract class AbstractEntityManager {
Entity entityObj = EntityUtil.getEntity(type, entity);
canRemove(entityObj);
+ tokenList = obtainEntityLocks(entityObj, "delete");
if (entityType.isSchedulable() && !DeploymentUtil.isPrism()) {
getWorkflowEngine().delete(entityObj);
removedFromEngine = "(KILLED in ENGINE)";
@@ -267,6 +269,8 @@ public abstract class AbstractEntityManager {
} catch (Throwable e) {
LOG.error("Unable to reach workflow engine for deletion or deletion failed", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+ } finally {
+ releaseEntityLocks(entity, tokenList);
}
}
@@ -285,7 +289,7 @@ public abstract class AbstractEntityManager {
validateUpdate(oldEntity, newEntity);
configStore.initiateUpdate(newEntity);
- tokenList = obtainUpdateEntityLocks(oldEntity);
+ tokenList = obtainEntityLocks(oldEntity, "update");
StringBuilder result = new StringBuilder("Updated successfully");
//Update in workflow engine
@@ -311,11 +315,11 @@ public abstract class AbstractEntityManager {
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
} finally {
ConfigurationStore.get().cleanupUpdateInit();
- releaseUpdateEntityLocks(entityName, tokenList);
+ releaseEntityLocks(entityName, tokenList);
}
}
- private List<Entity> obtainUpdateEntityLocks(Entity entity)
+ private List<Entity> obtainEntityLocks(Entity entity, String command)
throws FalconException {
List<Entity> tokenList = new ArrayList<Entity>();
@@ -323,25 +327,28 @@ public abstract class AbstractEntityManager {
if (memoryLocks.acquireLock(entity)) {
tokenList.add(entity);
} else {
- throw new FalconException("Looks like an update command is already issued for " + entity.toShortString());
+ throw new FalconException(command + " command is already issued for " + entity.toShortString());
}
- //now obtain locks for all dependent entities.
+ //now obtain locks for all dependent entities if any.
Set<Entity> affectedEntities = EntityGraph.get().getDependents(entity);
- for (Entity e : affectedEntities) {
- if (memoryLocks.acquireLock(e)) {
- tokenList.add(e);
- } else {
- LOG.error("Error while trying to acquire lock for {}. Releasing already obtained locks",
- e.toShortString());
- throw new FalconException("There are multiple update commands running for dependent entity "
- + e.toShortString());
+ if (affectedEntities != null) {
+ for (Entity e : affectedEntities) {
+ if (memoryLocks.acquireLock(e)) {
+ tokenList.add(e);
+ LOG.debug("{} on entity {} has acquired lock on {}", command, entity, e);
+ } else {
+ LOG.error("Error while trying to acquire lock for {}. Releasing already obtained locks",
+ e.toShortString());
+ throw new FalconException("There are multiple update commands running for dependent entity "
+ + e.toShortString());
+ }
}
}
return tokenList;
}
- private void releaseUpdateEntityLocks(String entityName, List<Entity> tokenList) {
+ private void releaseEntityLocks(String entityName, List<Entity> tokenList) {
if (tokenList != null && !tokenList.isEmpty()) {
for (Entity entity : tokenList) {
memoryLocks.releaseLock(entity);
@@ -391,14 +398,21 @@ public abstract class AbstractEntityManager {
}
}
- protected synchronized Entity submitInternal(HttpServletRequest request, String type)
+ protected Entity submitInternal(HttpServletRequest request, String type)
throws IOException, FalconException {
EntityType entityType = EntityType.getEnum(type);
Entity entity = deserializeEntity(request, entityType);
+ List<Entity> tokenList = null;
// KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
decorateEntityWithACL(entity);
+ try {
+ tokenList = obtainEntityLocks(entity, "submit");
+ }finally {
+ ConfigurationStore.get().cleanupUpdateInit();
+ releaseEntityLocks(entity.getName(), tokenList);
+ }
Entity existingEntity = configStore.get(entityType, entity.getName());
if (existingEntity != null) {
if (EntityUtil.equals(existingEntity, entity)) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index f0cee61..bcd3bd5 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -574,6 +574,55 @@ public class EntityManagerJerseyIT {
context.assertSuccessful(response);
}
+ @Test
+ public void testDuplicateSubmitCommands() throws Exception {
+ TestContext context = newContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ ExecutorService duplicateService = Executors.newSingleThreadExecutor();
+
+ Future<ClientResponse> future = service.submit(new SubmitCommand(context, overlay));
+ Future<ClientResponse> duplicateFuture = duplicateService.submit(new SubmitCommand(context, overlay));
+
+ ClientResponse response = future.get();
+ ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get();
+
+ // since there are duplicate threads for submits, there is no guarantee which request will succeed.
+ testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse);
+ }
+
+ @Test
+ public void testDuplicateDeleteCommands() throws Exception {
+ TestContext context = newContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+ context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ ExecutorService duplicateService = Executors.newSingleThreadExecutor();
+
+ Future<ClientResponse> future = service.submit(new DeleteCommand(context, overlay.get("cluster"), "cluster"));
+ Future<ClientResponse> duplicateFuture = duplicateService.submit(new DeleteCommand(context,
+ overlay.get("cluster"), "cluster"));
+
+ ClientResponse response = future.get();
+ ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get();
+
+ // since there are duplicate threads for deletion, there is no guarantee which request will succeed.
+ testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse);
+ }
+
+ private void testDuplicateCommandsResponse(TestContext context, ClientResponse response,
+ ClientResponse duplicateSubmitThreadResponse) {
+ if (response.getStatus() == Response.Status.OK.getStatusCode()) {
+ context.assertSuccessful(response);
+ context.assertFailure(duplicateSubmitThreadResponse);
+ } else {
+ context.assertFailure(response);
+ context.assertSuccessful(duplicateSubmitThreadResponse);
+ }
+ }
+
public void testProcesssScheduleAndDelete() throws Exception {
TestContext context = newContext();
ClientResponse clientResponse;
@@ -886,13 +935,7 @@ public class EntityManagerJerseyIT {
ClientResponse duplicateUpdateThreadResponse = future.get();
// since there are duplicate threads for updates, there is no guarantee which request will succeed
- if (response.getStatus() == Response.Status.OK.getStatusCode()) {
- context.assertSuccessful(response);
- context.assertFailure(duplicateUpdateThreadResponse);
- } else {
- context.assertFailure(response);
- context.assertSuccessful(duplicateUpdateThreadResponse);
- }
+ testDuplicateCommandsResponse(context, response, duplicateUpdateThreadResponse);
}
@@ -932,4 +975,49 @@ public class EntityManagerJerseyIT {
return update(context, process, endTime, false);
}
}
+
+ class SubmitCommand implements Callable<ClientResponse> {
+ private Map<String, String> overlay;
+ private TestContext context;
+
+ public TestContext getContext() {
+ return context;
+ }
+
+ public Map<String, String> getOverlay() {
+ return overlay;
+ }
+
+ public SubmitCommand(TestContext context, Map<String, String> overlay) {
+ this.context = context;
+ this.overlay = overlay;
+ }
+
+ @Override
+ public ClientResponse call() throws Exception {
+ return context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+ }
+ }
+
+ class DeleteCommand implements Callable<ClientResponse> {
+ private TestContext context;
+ private String entityName;
+ private String entityType;
+
+ public TestContext getContext() {
+ return context;
+ }
+
+ public DeleteCommand(TestContext context, String entityName, String entityType) {
+ this.context = context;
+ this.entityName = entityName;
+ this.entityType = entityType;
+ }
+
+ @Override
+ public ClientResponse call() throws Exception {
+ return context.deleteFromFalcon(entityName, entityType);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd002845/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 4a25b88..54671fb 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -384,6 +384,13 @@ public class TestContext {
.post(ClientResponse.class, rawlogStream);
}
+ public ClientResponse deleteFromFalcon(String entityName, String entityType) throws IOException{
+ return this.service.path("api/entities/delete/" + entityType + "/" + entityName.toLowerCase())
+ .header("Cookie", getAuthenticationToken())
+ .accept(MediaType.TEXT_XML)
+ .delete(ClientResponse.class);
+ }
+
public void assertStatus(ClientResponse clientResponse, APIResult.Status status) {
String response = clientResponse.getEntity(String.class);
try {