You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drat.apache.org by ma...@apache.org on 2018/08/05 23:05:43 UTC

[drat] 01/02: - refactor and clean up reduce

This is an automated email from the ASF dual-hosted git repository.

mattmann pushed a commit to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git

commit abce3831775d3af5b97557cd15ab2d1030e9be2b
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Sun Aug 5 16:02:18 2018 -0700

    - refactor and clean up reduce
---
 .../src/main/java/backend/ProcessDratWrapper.java  | 115 ++++++++++-----------
 .../test/java/backend/TestProcessDratWrapper.java  |   8 +-
 2 files changed, 60 insertions(+), 63 deletions(-)

diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java
index a068685..06e1ff8 100644
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@ -70,9 +70,9 @@ public class ProcessDratWrapper extends GenericProcess
   private static final String MAP_CMD = "map";
   private static final String REDUCE_CMD = "reduce";
   private static final String STATUS_IDLE = "idle";
-  private static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
-  private static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
-  private static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
+  protected static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
+  protected static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
+  protected static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
   private static final String[] WIPE_TYPES = { "RatLog", "GenericFile",
       "RatAggregateLog" };
 
@@ -179,18 +179,18 @@ public class ProcessDratWrapper extends GenericProcess
   @Override
   public void reduce() throws IOException {
     setStatus(REDUCE_CMD);
-    DratLog mapLog = new DratLog("REDUCING");
+    DratLog reduceLog = new DratLog("REDUCING");
     WorkflowRestResource restResource = new WorkflowRestResource();
     DynamicWorkflowRequestWrapper requestBody = new DynamicWorkflowRequestWrapper();
     requestBody.taskIds = new ArrayList<>();
     requestBody.taskIds.add(REDUCE_TASK_ID);
     LOG.info("STARTING REDUCING");
-    mapLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
+    reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
     String resp = (String)restResource.performDynamicWorkFlow(requestBody);
     if(resp.equals("OK")) {
-        mapLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
+        reduceLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
     }else {
-        mapLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
+        reduceLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
         throw new IOException(resp);
     }
   }
@@ -257,15 +257,19 @@ public class ProcessDratWrapper extends GenericProcess
     this.map();
 
     // don't run reduce until all maps are done
-    while (mapsStillRunning()) {
+    while (stillRunning(PARTITION_AND_MAP_TASK_ID) || stillRunning(MAPPER_TASK_ID)) {
       Thread.sleep(DRAT_PROCESS_WAIT_DURATION);
       LOG.info("MAP STILL RUNNING");
     }
     // you're not done until the final log is generated.
     while (!hasAggregateRatLog()) {
       try {
-        reduce();
-        LOG.info("REDUCE STILL RUNNING");
+        if (!stillRunning(REDUCE_TASK_ID)) {
+          reduce();
+        }
+        else {
+          LOG.info("REDUCE STILL RUNNING.");
+        }
       } catch (IOException e) {
         LOG.warning("Fired reduce off before mappers were done. Sleeping: ["
             + String.valueOf(DRAT_PROCESS_WAIT_DURATION / 1000)
@@ -287,15 +291,16 @@ public class ProcessDratWrapper extends GenericProcess
         + "]: " + breakStatus);
     return numLogs > 0;
   }
+  
+  private boolean stillRunning(String taskId) throws Exception {
+        WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
+        List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
+        for(WorkflowInstance instance : workflowInstances){
+          LOG.info("Running Instances : id: "+instance.getId()
+                  +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
+        }
+        return taskStillRunning(workflowInstances, taskId);            
 
-  private boolean mapsStillRunning() throws Exception {
-    WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
-    List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
-    for(WorkflowInstance instance : workflowInstances){
-      LOG.info("Running Instances : id: "+instance.getId()
-              +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
-    }
-    return stillRunning(workflowInstances);
   }
 
   @VisibleForTesting
@@ -340,62 +345,52 @@ public class ProcessDratWrapper extends GenericProcess
     }
     return items;
   }
-
-  @VisibleForTesting
-  protected boolean stillRunning(List<WorkflowInstance> instances) {
-    List<WorkflowInstance> partitionInstances = filterPartitioners(instances);
-    List<WorkflowInstance> mapperInstances = filterMappers(instances);
-    LOG.info("Checking partitioners: inspecting ["+String.valueOf(partitionInstances
-        .size()) + "] partitioners.");
-    for (WorkflowInstance partitionInstance: partitionInstances) {
-      if (isRunning(partitionInstance.getState().getName())) {
-        LOG.info("Partitioner: [" + partitionInstance.getId() + "] still running.");
-        return true;
-      }
-    }   
-    
-    LOG.info("Checking mappers: inspecting ["
-            + String.valueOf(mapperInstances.size()) + "] mappers.");
-    for (WorkflowInstance mapperInstance : mapperInstances) {
-      if (isRunning(mapperInstance.getState().getName())) {
-        LOG.info("Mapper: [" + mapperInstance.getId() + "] still running.");
-        return true;
+  
+  protected boolean taskStillRunning(List<WorkflowInstance> instances, String ...taskIds) {
+    if (taskIds != null && taskIds.length > 0) {
+      for(String taskId: taskIds) {
+        List<WorkflowInstance> insts = filterInstances(instances, taskId);
+        LOG.info("Checking task: "+taskId+" : inspecting ["+String.valueOf(instances.size())+"] tasks.");
+        for(WorkflowInstance i: insts) {
+         if(isRunning(i.getState().getName())) {
+           LOG.info("Task: [" + i.getId() + "] still running.");     
+           return true;
+         }
+        }
       }
     }
+    
     return false;
   }
   
   @VisibleForTesting
+  @Deprecated
   protected List<WorkflowInstance> filterPartitioners(List<WorkflowInstance> instances){
-    List<WorkflowInstance> partitioners = new ArrayList<>();
+    return filterInstances(instances, PARTITION_AND_MAP_TASK_ID); 
+  }
+
+  @VisibleForTesting
+  @Deprecated
+  protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
+     return this.filterInstances(instances, MAPPER_TASK_ID);
+  }
+  
+  @VisibleForTesting 
+  protected List<WorkflowInstance> filterInstances(List<WorkflowInstance> instances, String taskId){
+    List<WorkflowInstance> insts = new ArrayList<>();
     if(instances!=null && instances.size()>0){
         for(WorkflowInstance instance:instances){
-            if (instance.getCurrentTask().getTaskId().equals(PARTITION_AND_MAP_TASK_ID)) {
-                LOG.info("Adding partition/map: ["+instance.getCurrentTask().getTaskId()+"]");
-                partitioners.add(instance);
+            if(instance.getCurrentTask().getTaskId().equals(taskId)){
+                LOG.info("Adding "+taskId+" instance: [" + instance.getCurrentTask().getTaskId() + "]");
+                insts.add(instance);
             }else{
                 LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
             }
         }
     }
-    return partitioners;    
-  }
-
-  @VisibleForTesting
-  protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
-      List<WorkflowInstance> mappers = new ArrayList<>();
-      if(instances!=null && instances.size()>0){
-          for(WorkflowInstance instance:instances){
-              if(instance.getCurrentTask().getTaskId().equals(MAPPER_TASK_ID)){
-                  LOG.info("Adding mapper: [" + instance.getCurrentTask().getTaskId() + "]");
-                  mappers.add(instance);
-              }else{
-                  LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
-              }
-          }
-      }
-      return mappers;
-  }
+    return insts;
+}  
+  
 
   @VisibleForTesting
   protected boolean isRunning(String status) {
diff --git a/proteus/src/test/java/backend/TestProcessDratWrapper.java b/proteus/src/test/java/backend/TestProcessDratWrapper.java
index 181837d..c816512 100644
--- a/proteus/src/test/java/backend/TestProcessDratWrapper.java
+++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 import backend.ProcessDratWrapper;
+import static backend.ProcessDratWrapper.MAPPER_TASK_ID;
+import static backend.ProcessDratWrapper.PARTITION_AND_MAP_TASK_ID;
 import junit.framework.TestCase;
 
 public class TestProcessDratWrapper extends TestCase {
@@ -52,7 +54,7 @@ public class TestProcessDratWrapper extends TestCase {
     for(WorkflowItem wi: items) {
       insts.add(wi.toInstance());
     }
-    assertTrue(wrapper.stillRunning(insts)); 
+    assertTrue(wrapper.taskStillRunning(insts, PARTITION_AND_MAP_TASK_ID, MAPPER_TASK_ID)); 
   }
 
   public void testFilterPartitioners(){
@@ -70,7 +72,7 @@ public class TestProcessDratWrapper extends TestCase {
       insts.add(wi.toInstance());
     }    
     List<WorkflowInstance> partitioners = null;
-    partitioners = wrapper.filterPartitioners(insts);
+    partitioners = wrapper.filterInstances(insts, PARTITION_AND_MAP_TASK_ID);
     assertNotNull(partitioners);
     assertEquals(2, partitioners.size());    
   }
@@ -90,7 +92,7 @@ public class TestProcessDratWrapper extends TestCase {
       insts.add(wi.toInstance());
     }    
     List<WorkflowInstance> mappers = null;
-    mappers = wrapper.filterMappers(insts);
+    mappers = wrapper.filterInstances(insts, MAPPER_TASK_ID);
     assertNotNull(mappers);
     assertEquals(1, mappers.size());    
   }