You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drat.apache.org by ma...@apache.org on 2018/08/15 16:48:02 UTC

[drat] branch gsoc18 updated: - use FutureTask when checking for workflow instance mapeprs

This is an automated email from the ASF dual-hosted git repository.

mattmann pushed a commit to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git


The following commit(s) were added to refs/heads/gsoc18 by this push:
     new f50599e  - use FutureTask when checking for workflow instance mapeprs
f50599e is described below

commit f50599ecf7f51817108bdb1a5d5f3d8ea65366b3
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Wed Aug 15 09:47:07 2018 -0700

    - use FutureTask when checking for workflow instance mapeprs
---
 .../src/main/java/backend/ProcessDratWrapper.java  | 53 ++++++++++++++++++++--
 1 file changed, 49 insertions(+), 4 deletions(-)

diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java
index a7c670a..c72f1a1 100644
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@ -57,7 +57,13 @@ import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.logging.Logger;
 
 public class ProcessDratWrapper extends GenericProcess
@@ -192,7 +198,7 @@ public class ProcessDratWrapper extends GenericProcess
     requestBody.taskIds = new ArrayList<>();
     requestBody.taskIds.add(PARTITION_AND_MAP_TASK_ID);
     mapLog.logInfo("STARTING MAPPING");
-    mapLog.logInfo("STARTING", " (dynamic workflow with task "+PARTITION_AND_MAP_TASK_ID);
+    mapLog.logInfo("STARTING", " (dynamic workflow with task "+PARTITION_AND_MAP_TASK_ID+")");
     String resp = restResource.performDynamicWorkFlow(requestBody);
     if(resp.equals("OK")) {
         mapLog.logInfo("STARTED SUCCESSFULLY, "+PARTITION_AND_MAP_TASK_ID+" dynamic workflow");
@@ -212,7 +218,7 @@ public class ProcessDratWrapper extends GenericProcess
     requestBody.taskIds = new ArrayList<>();
     requestBody.taskIds.add(REDUCE_TASK_ID);
     LOG.info("STARTING REDUCING");
-    reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
+    reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID+")");
     String resp = (String)restResource.performDynamicWorkFlow(requestBody);
     if(resp.equals("OK")) {
         reduceLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
@@ -337,12 +343,51 @@ public class ProcessDratWrapper extends GenericProcess
   }
   
   private boolean stillRunning(String taskId) throws Exception {
-        WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
-        List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
+        DratLog workflowRunLog = new DratLog("CHECKING FOR RUNNING MAPPERS/PARTITIONERS");
+        workflowRunLog.logInfo("Starting.", "");
+        final WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
+        FutureTask<List<WorkflowInstance>> timeoutWorkflowInst = 
+            new FutureTask<List<WorkflowInstance>>(
+                new Callable<List<WorkflowInstance>>() {
+                  @Override
+                  public List<WorkflowInstance> call() throws Exception {
+                    return workflowManagerUtils.getClient().getWorkflowInstances();
+                  }
+                }
+        );
+        
+        List<WorkflowInstance> workflowInstances = null;
+        Thread instCheckThread = null;
+        try {
+          instCheckThread = new Thread(timeoutWorkflowInst);
+          instCheckThread.start();
+          workflowInstances = timeoutWorkflowInst.get(3L, TimeUnit.SECONDS);
+        }
+        catch(InterruptedException e) {
+          workflowRunLog.logInfo("Drat::Checking Workflows:: Interrupted exception: "+e.getLocalizedMessage());
+          workflowInstances = Collections.EMPTY_LIST;              
+        }
+        catch(ExecutionException e) {
+          workflowRunLog.logInfo("Drat::Checking Workflows:: Execution exception: "+e.getLocalizedMessage());
+          workflowInstances = Collections.EMPTY_LIST;              
+        }
+        catch(TimeoutException e) {
+          workflowRunLog.logInfo("Drat::Checking Workflows:: Timeout exception: "+e.getLocalizedMessage());          
+          workflowInstances = Collections.EMPTY_LIST;          
+        }
+        finally {
+          try {
+            instCheckThread.join();
+          }
+          catch(InterruptedException ignore) {}
+        }
+       
         for(WorkflowInstance instance : workflowInstances){
           LOG.info("Running Instances : id: "+instance.getId()
                   +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
         }
+        
+        workflowRunLog.logInfo("Completed.", null);
         return taskStillRunning(workflowInstances, taskId);            
 
   }