You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/22 17:29:10 UTC
airavata git commit: Fixed : ack cancel delivery tag one time,
multiple acks cause to unbind consumer from queue.
Repository: airavata
Updated Branches:
refs/heads/master a74790a20 -> 83feaecee
Fixed : ack cancel delivery tag one time, multiple acks cause to unbind consumer from queue.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/83feaece
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/83feaece
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/83feaece
Branch: refs/heads/master
Commit: 83feaecee113239f028baa9c829fe4b60cf70682
Parents: a74790a
Author: shamrath <sh...@gmail.com>
Authored: Fri May 22 11:29:06 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Fri May 22 11:29:06 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/server/GfacServerHandler.java | 28 +++++++++++---------
.../gfac/bes/provider/impl/BESProvider.java | 3 ++-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 8 +++---
.../gfac/core/provider/GFacProvider.java | 3 ++-
.../airavata/gfac/core/utils/GFacUtils.java | 23 +++++++++++++---
.../org/apache/airavata/job/TestProvider.java | 3 ++-
.../gsissh/provider/impl/GSISSHProvider.java | 8 +++---
.../gfac/local/provider/impl/LocalProvider.java | 2 +-
.../gfac/ssh/provider/impl/SSHProvider.java | 8 +++---
9 files changed, 57 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 6cdbc29..5d747aa 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -49,7 +49,6 @@ import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
-import org.apache.airavata.persistance.registry.jpa.model.Status;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
@@ -399,31 +398,36 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
logger.error("Error while updating experiment status", e);
}
} else if (message.getType().equals(MessageType.TERMINATETASK)) {
+ boolean cancelSuccess = false;
+ TaskTerminateEvent event = new TaskTerminateEvent();
+ TBase messageEvent = message.getEvent();
try {
- TaskTerminateEvent event = new TaskTerminateEvent();
- TBase messageEvent = message.getEvent();
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
- boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+ boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), zk, message.getDeliveryTag());
if (saveDeliveryTagSuccess) {
- cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
+ cancelSuccess = cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
System.out.println(" Message Received with message id '" + message.getMessageId()
+ "' and with message type '" + message.getType());
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
} else {
throw new GFacException("Terminate Task fail to save delivery tag : " + String.valueOf(message.getDeliveryTag()) + " \n" +
"This happens when another cancel operation is being processed or experiment is in one of final states, complete|failed|cancelled.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
- if (rabbitMQTaskLaunchConsumer.isOpen()) {
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+ }finally {
+ if (cancelSuccess) {
+ // if cancel success , AiravataExperimentStatusUpdator will send an ack to this message.
} else {
try {
- rabbitMQTaskLaunchConsumer.reconnect();
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
- } catch (AiravataException e1) {
- logger.error("RabbitMQ reconnect attempt failed.");
+ if (GFacUtils.ackCancelRequest(event.getExperimentId(), zk)) {
+ if (!rabbitMQTaskLaunchConsumer.isOpen()) {
+ rabbitMQTaskLaunchConsumer.reconnect();
+ }
+ rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+ }
+ } catch (Exception e) {
+ logger.error("Error while ack to cancel request, experimentId: " + event.getExperimentId());
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index f62f3f4..dad2a4d 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -403,9 +403,10 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
}
@Override
- public void cancelJob(JobExecutionContext jobExecutionContext)
+ public boolean cancelJob(JobExecutionContext jobExecutionContext)
throws GFacProviderException, GFacException {
// TODO Auto-generated method stub
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 9f31e7e..d0db56c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -554,6 +554,7 @@ public class BetterGfacImpl implements GFac,Watcher {
}
// Register log event listener. This is required in all scenarios.
jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
+
if (gfacExpState == GfacExperimentState.PROVIDERINVOKING || gfacExpState == GfacExperimentState.JOBSUBMITTED
|| gfacExpState == GfacExperimentState.PROVIDERINVOKED) { // we already have changed registry status, we need to handle job canceling scenario.
log.info("Job is in a position to perform a proper cancellation");
@@ -790,7 +791,7 @@ public class BetterGfacImpl implements GFac,Watcher {
}
- private void invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException {
+ private boolean invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
initProvider(provider, jobExecutionContext);
@@ -800,6 +801,7 @@ public class BetterGfacImpl implements GFac,Watcher {
if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
invokeOutFlowHandlers(jobExecutionContext);
}
+ return true;
}
// TODO - Did refactoring, but need to recheck the logic again.
@@ -845,9 +847,9 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- private void cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+ private boolean cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
try {
- provider.cancelJob(jobExecutionContext);
+ return provider.cancelJob(jobExecutionContext);
} catch (Exception e) {
throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
index e031980..f13f1b3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
@@ -54,9 +54,10 @@ public interface GFacProvider{
/**
* Cancels all jobs relevant to an experiment.
* @param jobExecutionContext The job execution context, contains runtime information.
+ * @return <code>true</code> if cancel operation success. <code>false</code> otherwise.
* @throws GFacException If an error occurred while cancelling the job.
*/
- public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException;
+ public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException;
/**
http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 8b62425..9e3a50f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -545,11 +545,8 @@ public class GFacUtils {
return null;
}
- public static boolean setExperimentCancel(String experimentId, String taskId, ZooKeeper zk, String experimentNode,
- String pickedChild, String tokenId, long deliveryTag)throws KeeperException,
+ public static boolean setExperimentCancel(String experimentId, ZooKeeper zk, long deliveryTag) throws KeeperException,
InterruptedException {
- String experimentPath = experimentNode + File.separator + pickedChild;
- String newExpNode = experimentPath + File.separator + experimentId;
String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
if (experimentEntry == null) {
// This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
@@ -764,4 +761,22 @@ public class GFacUtils {
}
return false;
}
+
+ public static boolean ackCancelRequest(String experimentId, ZooKeeper zk) throws KeeperException, InterruptedException {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
+ String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
+ if (experimentEntry == null) {
+ // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
+ log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
+ "This happen when experiment completed and already removed from the zookeeper");
+ } else {
+ // check cancel operation is being processed for the same experiment.
+ Stat cancelState = zk.exists(cancelNodePath, false);
+ if (cancelState != null) {
+ ZKUtil.deleteRecursive(zk,cancelNodePath);
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
index 5881203..8653e03 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
@@ -29,8 +29,9 @@ import java.util.Map;
public class TestProvider extends AbstractProvider {
- public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
//To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 444ebf2..c0784fb 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -180,7 +180,7 @@ public class GSISSHProvider extends AbstractProvider {
//To change body of implemented methods use File | Settings | File Templates.
}
- public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+ public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
//To change body of implemented methods use File | Settings | File Templates.
log.info("canceling the job status in GSISSHProvider!!!!!");
JobDetails jobDetails = jobExecutionContext.getJobDetails();
@@ -199,15 +199,17 @@ public class GSISSHProvider extends AbstractProvider {
// This installed path is a mandetory field, because this could change based on the computing resource
if(jobDetails == null) {
log.error("There is not JobDetails so cancelations cannot perform !!!");
- return;
+ return false;
}
if (jobDetails.getJobID() != null) {
+ // if this operation success without any exceptions, we can assume cancel operation succeeded.
cluster.cancelJob(jobDetails.getJobID());
} else {
log.error("No Job Id is set, so cannot perform the cancel operation !!!");
- return;
+ return false;
}
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED, monitorPublisher);
+ return true;
// we know this host is type GsiSSHHostType
} catch (SSHApiException e) {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 6d84cf2..ef66171 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -246,7 +246,7 @@ public class LocalProvider extends AbstractProvider {
}
}
- public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
+ public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
throw new NotImplementedException();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/83feaece/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index ada99fb..19ea3ac 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -240,7 +240,7 @@ public class SSHProvider extends AbstractProvider {
}
- public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
JobDetails jobDetails = jobExecutionContext.getJobDetails();
StringBuffer data = new StringBuffer();
String hostAddress = jobExecutionContext.getHostName();
@@ -256,12 +256,14 @@ public class SSHProvider extends AbstractProvider {
// This installed path is a mandetory field, because this could change based on the computing resource
if (jobDetails == null) {
log.error("There is not JobDetails, Cancel request can't be performed !!!");
- return;
+ return false;
}
try {
if (jobDetails.getJobID() != null) {
if (cluster.cancelJob(jobDetails.getJobID()) != null) {
+ // if this operation success without any exceptions, we can assume cancel operation succeeded.
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED, monitorPublisher);
+ return true;
} else {
log.info("Job Cancel operation failed");
}
@@ -284,7 +286,7 @@ public class SSHProvider extends AbstractProvider {
GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
// throw new GFacProviderException(error, e);
}
- // we know this host is type GsiSSHHostType
+ return false;
}
}