You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/22 17:29:10 UTC

airavata git commit: Fixed : ack cancel delivery tag one time, multiple acks cause to unbind consumer from queue.

Repository: airavata
Updated Branches:
  refs/heads/master a74790a20 -> 83feaecee


Fixed : ack cancel delivery tag one time, multiple acks cause to unbind consumer from queue.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/83feaece
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/83feaece
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/83feaece

Branch: refs/heads/master
Commit: 83feaecee113239f028baa9c829fe4b60cf70682
Parents: a74790a
Author: shamrath <sh...@gmail.com>
Authored: Fri May 22 11:29:06 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Fri May 22 11:29:06 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java | 28 +++++++++++---------
 .../gfac/bes/provider/impl/BESProvider.java     |  3 ++-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  8 +++---
 .../gfac/core/provider/GFacProvider.java        |  3 ++-
 .../airavata/gfac/core/utils/GFacUtils.java     | 23 +++++++++++++---
 .../org/apache/airavata/job/TestProvider.java   |  3 ++-
 .../gsissh/provider/impl/GSISSHProvider.java    |  8 +++---
 .../gfac/local/provider/impl/LocalProvider.java |  2 +-
 .../gfac/ssh/provider/impl/SSHProvider.java     |  8 +++---
 9 files changed, 57 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 6cdbc29..5d747aa 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -49,7 +49,6 @@ import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
-import org.apache.airavata.persistance.registry.jpa.model.Status;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
@@ -399,31 +398,36 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                     logger.error("Error while updating experiment status", e);
                 }
             } else if (message.getType().equals(MessageType.TERMINATETASK)) {
+                boolean cancelSuccess = false;
+                TaskTerminateEvent event = new TaskTerminateEvent();
+                TBase messageEvent = message.getEvent();
                 try {
-                    TaskTerminateEvent event = new TaskTerminateEvent();
-                    TBase messageEvent = message.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+                    boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), zk, message.getDeliveryTag());
                     if (saveDeliveryTagSuccess) {
-                        cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
+                        cancelSuccess = cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
                         System.out.println(" Message Received with message id '" + message.getMessageId()
                                 + "' and with message type '" + message.getType());
-                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                     } else {
                         throw new GFacException("Terminate Task fail to save delivery tag : " + String.valueOf(message.getDeliveryTag()) + " \n" +
                                 "This happens when another cancel operation is being processed or experiment is in one of final states, complete|failed|cancelled.");
                     }
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
-                    if (rabbitMQTaskLaunchConsumer.isOpen()) {
-                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                }finally {
+                    if (cancelSuccess) {
+                        // if cancel success , AiravataExperimentStatusUpdator will send an ack to this message.
                     } else {
                         try {
-                            rabbitMQTaskLaunchConsumer.reconnect();
-                            rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
-                        } catch (AiravataException e1) {
-                            logger.error("RabbitMQ reconnect attempt failed.");
+                            if (GFacUtils.ackCancelRequest(event.getExperimentId(), zk)) {
+                                if (!rabbitMQTaskLaunchConsumer.isOpen()) {
+                                    rabbitMQTaskLaunchConsumer.reconnect();
+                                }
+                                rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                            }
+                        } catch (Exception e) {
+                            logger.error("Error while ack to cancel request, experimentId: " + event.getExperimentId());
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index f62f3f4..dad2a4d 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -403,9 +403,10 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
 	}
 
 	@Override
-	public void cancelJob(JobExecutionContext jobExecutionContext)
+	public boolean cancelJob(JobExecutionContext jobExecutionContext)
 			throws GFacProviderException, GFacException {
 		// TODO Auto-generated method stub
+        return false;
 	}
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 9f31e7e..d0db56c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -554,6 +554,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             }
             // Register log event listener. This is required in all scenarios.
             jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
+
             if (gfacExpState == GfacExperimentState.PROVIDERINVOKING || gfacExpState == GfacExperimentState.JOBSUBMITTED
                     || gfacExpState == GfacExperimentState.PROVIDERINVOKED) { // we already have changed registry status, we need to handle job canceling scenario.
                 log.info("Job is in a position to perform a proper cancellation");
@@ -790,7 +791,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     }
 
-    private void invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException {
+    private boolean invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             initProvider(provider, jobExecutionContext);
@@ -800,6 +801,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
             invokeOutFlowHandlers(jobExecutionContext);
         }
+        return true;
     }
 
     // TODO - Did refactoring, but need to recheck the logic again.
@@ -845,9 +847,9 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-    private void cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+    private boolean cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
         try {
-            provider.cancelJob(jobExecutionContext);
+            return provider.cancelJob(jobExecutionContext);
         } catch (Exception e) {
             throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
index e031980..f13f1b3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
@@ -54,9 +54,10 @@ public interface GFacProvider{
     /**
      * Cancels all jobs relevant to an experiment.
      * @param jobExecutionContext The job execution context, contains runtime information.
+     * @return <code>true</code> if cancel operation success. <code>false</code> otherwise.
      * @throws GFacException If an error occurred while cancelling the job.
      */
-    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException;
+    public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException;
 
 
     /**

http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 8b62425..9e3a50f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -545,11 +545,8 @@ public class GFacUtils {
 		return null;
 	}
 
-    public static boolean setExperimentCancel(String experimentId, String taskId, ZooKeeper zk, String experimentNode,
-                                           String pickedChild, String tokenId, long deliveryTag)throws KeeperException,
+    public static boolean setExperimentCancel(String experimentId, ZooKeeper zk, long deliveryTag) throws KeeperException,
             InterruptedException {
-        String experimentPath = experimentNode + File.separator + pickedChild;
-        String newExpNode = experimentPath + File.separator + experimentId;
         String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
         if (experimentEntry == null) {
             // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
@@ -764,4 +761,22 @@ public class GFacUtils {
         }
         return false;
     }
+
+    public static boolean ackCancelRequest(String experimentId, ZooKeeper zk) throws KeeperException, InterruptedException {
+        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
+        String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
+        if (experimentEntry == null) {
+            // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
+            log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
+                    "This happen when experiment completed and already removed from the zookeeper");
+        } else {
+            // check cancel operation is being processed for the same experiment.
+            Stat cancelState = zk.exists(cancelNodePath, false);
+            if (cancelState != null) {
+                ZKUtil.deleteRecursive(zk,cancelNodePath);
+                return true;
+            }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
index 5881203..8653e03 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
@@ -29,8 +29,9 @@ import java.util.Map;
 
 public class TestProvider extends AbstractProvider {
 
-    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+    public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
         //To change body of implemented methods use File | Settings | File Templates.
+        return false;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 444ebf2..c0784fb 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -180,7 +180,7 @@ public class GSISSHProvider extends AbstractProvider {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+    public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
         //To change body of implemented methods use File | Settings | File Templates.
         log.info("canceling the job status in GSISSHProvider!!!!!");
         JobDetails jobDetails = jobExecutionContext.getJobDetails();
@@ -199,15 +199,17 @@ public class GSISSHProvider extends AbstractProvider {
             // This installed path is a mandetory field, because this could change based on the computing resource
             if(jobDetails == null) {
                 log.error("There is not JobDetails so cancelations cannot perform !!!");
-                return;
+                return false;
             }
             if (jobDetails.getJobID() != null) {
+                // if this operation success without any exceptions, we can assume cancel operation succeeded.
                 cluster.cancelJob(jobDetails.getJobID());
             } else {
                 log.error("No Job Id is set, so cannot perform the cancel operation !!!");
-                return;
+                return false;
             }
             GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED, monitorPublisher);
+            return true;
             // we know this host is type GsiSSHHostType
         } catch (SSHApiException e) {
             String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();

http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 6d84cf2..ef66171 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -246,7 +246,7 @@ public class LocalProvider extends AbstractProvider {
         }
     }
 
-    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
+    public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
         throw new NotImplementedException();
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index ada99fb..19ea3ac 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -240,7 +240,7 @@ public class SSHProvider extends AbstractProvider {
 
     }
 
-    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+    public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
         JobDetails jobDetails = jobExecutionContext.getJobDetails();
         StringBuffer data = new StringBuffer();
         String hostAddress = jobExecutionContext.getHostName();
@@ -256,12 +256,14 @@ public class SSHProvider extends AbstractProvider {
             // This installed path is a mandetory field, because this could change based on the computing resource
             if (jobDetails == null) {
                 log.error("There is not JobDetails, Cancel request can't be performed !!!");
-                return;
+                return false;
             }
             try {
                 if (jobDetails.getJobID() != null) {
                     if (cluster.cancelJob(jobDetails.getJobID()) != null) {
+                        // if this operation success without any exceptions, we can assume cancel operation succeeded.
                         GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED, monitorPublisher);
+                        return true;
                     } else {
                         log.info("Job Cancel operation failed");
                     }
@@ -284,7 +286,7 @@ public class SSHProvider extends AbstractProvider {
                 GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
 //                throw new GFacProviderException(error, e);
             }
-            // we know this host is type GsiSSHHostType
+            return false;
         }
     }