You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drat.apache.org by ma...@apache.org on 2018/08/15 16:48:02 UTC
[drat] branch gsoc18 updated: - use FutureTask when checking for
workflow instance mapeprs
This is an automated email from the ASF dual-hosted git repository.
mattmann pushed a commit to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git
The following commit(s) were added to refs/heads/gsoc18 by this push:
new f50599e - use FutureTask when checking for workflow instance mapeprs
f50599e is described below
commit f50599ecf7f51817108bdb1a5d5f3d8ea65366b3
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Wed Aug 15 09:47:07 2018 -0700
- use FutureTask when checking for workflow instance mapeprs
---
.../src/main/java/backend/ProcessDratWrapper.java | 53 ++++++++++++++++++++--
1 file changed, 49 insertions(+), 4 deletions(-)
diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java
index a7c670a..c72f1a1 100644
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@ -57,7 +57,13 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
public class ProcessDratWrapper extends GenericProcess
@@ -192,7 +198,7 @@ public class ProcessDratWrapper extends GenericProcess
requestBody.taskIds = new ArrayList<>();
requestBody.taskIds.add(PARTITION_AND_MAP_TASK_ID);
mapLog.logInfo("STARTING MAPPING");
- mapLog.logInfo("STARTING", " (dynamic workflow with task "+PARTITION_AND_MAP_TASK_ID);
+ mapLog.logInfo("STARTING", " (dynamic workflow with task "+PARTITION_AND_MAP_TASK_ID+")");
String resp = restResource.performDynamicWorkFlow(requestBody);
if(resp.equals("OK")) {
mapLog.logInfo("STARTED SUCCESSFULLY, "+PARTITION_AND_MAP_TASK_ID+" dynamic workflow");
@@ -212,7 +218,7 @@ public class ProcessDratWrapper extends GenericProcess
requestBody.taskIds = new ArrayList<>();
requestBody.taskIds.add(REDUCE_TASK_ID);
LOG.info("STARTING REDUCING");
- reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
+ reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID+")");
String resp = (String)restResource.performDynamicWorkFlow(requestBody);
if(resp.equals("OK")) {
reduceLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
@@ -337,12 +343,51 @@ public class ProcessDratWrapper extends GenericProcess
}
private boolean stillRunning(String taskId) throws Exception {
- WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
- List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
+ DratLog workflowRunLog = new DratLog("CHECKING FOR RUNNING MAPPERS/PARTITIONERS");
+ workflowRunLog.logInfo("Starting.", "");
+ final WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
+ FutureTask<List<WorkflowInstance>> timeoutWorkflowInst =
+ new FutureTask<List<WorkflowInstance>>(
+ new Callable<List<WorkflowInstance>>() {
+ @Override
+ public List<WorkflowInstance> call() throws Exception {
+ return workflowManagerUtils.getClient().getWorkflowInstances();
+ }
+ }
+ );
+
+ List<WorkflowInstance> workflowInstances = null;
+ Thread instCheckThread = null;
+ try {
+ instCheckThread = new Thread(timeoutWorkflowInst);
+ instCheckThread.start();
+ workflowInstances = timeoutWorkflowInst.get(3L, TimeUnit.SECONDS);
+ }
+ catch(InterruptedException e) {
+ workflowRunLog.logInfo("Drat::Checking Workflows:: Interrupted exception: "+e.getLocalizedMessage());
+ workflowInstances = Collections.EMPTY_LIST;
+ }
+ catch(ExecutionException e) {
+ workflowRunLog.logInfo("Drat::Checking Workflows:: Execution exception: "+e.getLocalizedMessage());
+ workflowInstances = Collections.EMPTY_LIST;
+ }
+ catch(TimeoutException e) {
+ workflowRunLog.logInfo("Drat::Checking Workflows:: Timeout exception: "+e.getLocalizedMessage());
+ workflowInstances = Collections.EMPTY_LIST;
+ }
+ finally {
+ try {
+ instCheckThread.join();
+ }
+ catch(InterruptedException ignore) {}
+ }
+
for(WorkflowInstance instance : workflowInstances){
LOG.info("Running Instances : id: "+instance.getId()
+" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
}
+
+ workflowRunLog.logInfo("Completed.", null);
return taskStillRunning(workflowInstances, taskId);
}