You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drat.apache.org by ma...@apache.org on 2018/08/05 18:11:08 UTC

[drat] branch master updated: - address issue where partitioner runs, but no mappers have executed yet, and stop from going to reducers right away

This is an automated email from the ASF dual-hosted git repository.

mattmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drat.git


The following commit(s) were added to refs/heads/master by this push:
     new 27bbcf6  - address issue where partitioner runs, but no mappers have executed yet, and stop from going to reducers right away
     new c24cca7  Merge branch 'master' of github.com:apache/drat
27bbcf6 is described below

commit 27bbcf68b909d6daadd027dcd7a46e88c9f7d09f
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Sun Aug 5 10:31:20 2018 -0700

    - address issue where partitioner runs, but no mappers have executed
    yet, and stop from going to reducers right away
---
 .../src/main/java/backend/ProcessDratWrapper.java  | 26 ++++++++++++++++++++++
 .../test/java/backend/TestProcessDratWrapper.java  | 20 +++++++++++++++++
 2 files changed, 46 insertions(+)

diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java
index 420036c..a068685 100644
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@ -343,7 +343,17 @@ public class ProcessDratWrapper extends GenericProcess
 
   @VisibleForTesting
   protected boolean stillRunning(List<WorkflowInstance> instances) {
+    List<WorkflowInstance> partitionInstances = filterPartitioners(instances);
     List<WorkflowInstance> mapperInstances = filterMappers(instances);
+    LOG.info("Checking partitioners: inspecting ["+String.valueOf(partitionInstances
+        .size()) + "] partitioners.");
+    for (WorkflowInstance partitionInstance: partitionInstances) {
+      if (isRunning(partitionInstance.getState().getName())) {
+        LOG.info("Partitioner: [" + partitionInstance.getId() + "] still running.");
+        return true;
+      }
+    }   
+    
     LOG.info("Checking mappers: inspecting ["
             + String.valueOf(mapperInstances.size()) + "] mappers.");
     for (WorkflowInstance mapperInstance : mapperInstances) {
@@ -354,6 +364,22 @@ public class ProcessDratWrapper extends GenericProcess
     }
     return false;
   }
+  
+  @VisibleForTesting
+  protected List<WorkflowInstance> filterPartitioners(List<WorkflowInstance> instances){
+    List<WorkflowInstance> partitioners = new ArrayList<>();
+    if(instances!=null && instances.size()>0){
+        for(WorkflowInstance instance:instances){
+            if (instance.getCurrentTask().getTaskId().equals(PARTITION_AND_MAP_TASK_ID)) {
+                LOG.info("Adding partition/map: ["+instance.getCurrentTask().getTaskId()+"]");
+                partitioners.add(instance);
+            }else{
+                LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
+            }
+        }
+    }
+    return partitioners;    
+  }
 
   @VisibleForTesting
   protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
diff --git a/proteus/src/test/java/backend/TestProcessDratWrapper.java b/proteus/src/test/java/backend/TestProcessDratWrapper.java
index dbe768d..181837d 100644
--- a/proteus/src/test/java/backend/TestProcessDratWrapper.java
+++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java
@@ -54,6 +54,26 @@ public class TestProcessDratWrapper extends TestCase {
     }
     assertTrue(wrapper.stillRunning(insts)); 
   }
+
+  public void testFilterPartitioners(){
+    ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();
+    assertNotNull(wrapper);
+    String cmdLines =  "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=FINISHED, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" + 
+                            "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" + 
+                            "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:RatCodeAudit, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]";
+    
+    List<WorkflowItem> items = null;
+    items = wrapper.parseWorkflows(cmdLines);
+    assertNotNull(items);
+    List<WorkflowInstance> insts = new ArrayList<WorkflowInstance>(items.size());
+    for(WorkflowItem wi: items) {
+      insts.add(wi.toInstance());
+    }    
+    List<WorkflowInstance> partitioners = null;
+    partitioners = wrapper.filterPartitioners(insts);
+    assertNotNull(partitioners);
+    assertEquals(2, partitioners.size());    
+  }
   
   public void testFilterMappers(){
     ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();