You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drat.apache.org by ma...@apache.org on 2018/08/05 23:05:42 UTC

[drat] branch gsoc18 updated (69a9ca7 -> 35f9f91)

This is an automated email from the ASF dual-hosted git repository.

mattmann pushed a change to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git.


    from 69a9ca7  - address issue where partitioner runs, but no mappers have executed yet, and stop from going to reducers right away
     add dc4eadd  Added things
     add f79c126  Added Travis CI build status
     add 5e55b27  Removed commented lines
     add 87d790b  Added environment variables
     add 46487f8  Reworked environment variables
     add ad3cc2d  Removed some environment variables
     add 86128a6  Added clean install
     add 34d0251  Merge pull request #143 from RyanMcDonagh/master
     add 5d562f3  kick build
     add 27bbcf6  - address issue where partitioner runs, but no mappers have executed yet, and stop from going to reducers right away
     new abce383  - refactor and clean up reduce
     new 35f9f91  cleanup.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .travis.yml                                        |  15 +--
 README.md                                          |   3 +
 .../src/main/java/backend/ProcessDratWrapper.java  | 115 ++++++++++-----------
 .../test/java/backend/TestProcessDratWrapper.java  |  26 ++++-
 4 files changed, 91 insertions(+), 68 deletions(-)


[drat] 02/02: cleanup.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattmann pushed a commit to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git

commit 35f9f911a5975ea03faca8000ef9f312a15cf028
Merge: abce383 69a9ca7
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Sun Aug 5 16:05:11 2018 -0700

    cleanup.

 .gitignore                                         |     7 +
 distribution/pom.xml                               |    13 +
 distribution/src/main/assembly/assembly.xml        |     1 +
 pom.xml                                            |     2 +-
 .../src/main/java/backend/ProcessDratWrapper.java  |     2 +-
 .../test/java/backend/TestProcessDratWrapper.java  |    20 +
 webapps/pom.xml                                    |     1 +
 webapps/proteus-new/pom.xml                        |    93 +
 .../src/main/webapp/META-INF/context.xml           |     4 +
 .../proteus-new/src/main/webapp/WEB-INF/web.xml    |    44 +
 .../src/main/webapp/resources/babel.config.js      |    14 +
 .../src/main/webapp/resources/package-lock.json    | 13925 +++++++++++++++++++
 .../src/main/webapp/resources/package.json         |    57 +
 .../src/main/webapp/resources/public/favicon.ico   |   Bin 0 -> 1150 bytes
 .../src/main/webapp/resources/public/index.html    |    24 +
 .../src/main/webapp/resources/public/logo.png      |   Bin 0 -> 102270 bytes
 .../src/main/webapp/resources/src/App.vue          |   273 +
 .../resources/src/components/auditsummarycomp.vue  |   258 +
 .../resources/src/components/barchartcomp.vue      |   170 +
 .../resources/src/components/bublechartcomp.vue    |   185 +
 .../resources/src/components/controll_bar.vue      |   123 +
 .../resources/src/components/filelistcomp.vue      |   104 +
 .../resources/src/components/licencepiecomp.vue    |   170 +
 .../webapp/resources/src/components/piechart.vue   |   126 +
 .../resources/src/components/progresscomp.vue      |    93 +
 .../resources/src/components/projectstable.vue     |   404 +
 .../resources/src/components/statisticscomp.vue    |   111 +
 .../resources/src/components/topmimepiecomp.vue    |   178 +
 .../src/main/webapp/resources/src/logo.png         |   Bin 0 -> 102270 bytes
 .../src/main/webapp/resources/src/main.js          |    47 +
 .../src/main/webapp/resources/src/store/store.js   |    56 +
 .../src/main/webapp/resources/vue.config.js        |     3 +
 .../src/main/webapp/resources/webpack.config.js    |    47 +
 33 files changed, 16553 insertions(+), 2 deletions(-)

diff --cc proteus/src/main/java/backend/ProcessDratWrapper.java
index 06e1ff8,a068685..7ccdf40
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@@ -388,9 -378,24 +388,9 @@@ public class ProcessDratWrapper extend
              }
          }
      }
 -    return partitioners;    
 -  }
 -
 -  @VisibleForTesting
 -  protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
 -      List<WorkflowInstance> mappers = new ArrayList<>();
 -      if(instances!=null && instances.size()>0){
 -          for(WorkflowInstance instance:instances){
 -              if(instance.getCurrentTask().getTaskId().equals(MAPPER_TASK_ID)){
 -                  LOG.info("Adding mapper: [" + instance.getCurrentTask().getTaskId() + "]");
 -                  mappers.add(instance);
 -              }else{
 -                  LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
 -              }
 -          }
 -      }
 -      return mappers;
 -  }
 +    return insts;
- }  
++  }  
 +  
  
    @VisibleForTesting
    protected boolean isRunning(String status) {
diff --cc proteus/src/test/java/backend/TestProcessDratWrapper.java
index c816512,181837d..f41a138
--- a/proteus/src/test/java/backend/TestProcessDratWrapper.java
+++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java
@@@ -54,28 -52,28 +54,48 @@@ public class TestProcessDratWrapper ext
      for(WorkflowItem wi: items) {
        insts.add(wi.toInstance());
      }
 -    assertTrue(wrapper.stillRunning(insts)); 
 +    assertTrue(wrapper.taskStillRunning(insts, PARTITION_AND_MAP_TASK_ID, MAPPER_TASK_ID)); 
 +  }
 +
 +  public void testFilterPartitioners(){
 +    ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();
 +    assertNotNull(wrapper);
 +    String cmdLines =  "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=FINISHED, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" + 
 +                            "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" + 
 +                            "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:RatCodeAudit, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]";
 +    
 +    List<WorkflowItem> items = null;
 +    items = wrapper.parseWorkflows(cmdLines);
 +    assertNotNull(items);
 +    List<WorkflowInstance> insts = new ArrayList<WorkflowInstance>(items.size());
 +    for(WorkflowItem wi: items) {
 +      insts.add(wi.toInstance());
 +    }    
 +    List<WorkflowInstance> partitioners = null;
 +    partitioners = wrapper.filterInstances(insts, PARTITION_AND_MAP_TASK_ID);
 +    assertNotNull(partitioners);
 +    assertEquals(2, partitioners.size());    
    }
+ 
+   public void testFilterPartitioners(){
+     ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();
+     assertNotNull(wrapper);
+     String cmdLines =  "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=FINISHED, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" + 
+                             "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" + 
+                             "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:RatCodeAudit, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]";
+     
+     List<WorkflowItem> items = null;
+     items = wrapper.parseWorkflows(cmdLines);
+     assertNotNull(items);
+     List<WorkflowInstance> insts = new ArrayList<WorkflowInstance>(items.size());
+     for(WorkflowItem wi: items) {
+       insts.add(wi.toInstance());
+     }    
+     List<WorkflowInstance> partitioners = null;
+     partitioners = wrapper.filterPartitioners(insts);
+     assertNotNull(partitioners);
+     assertEquals(2, partitioners.size());    
+   }
    
    public void testFilterMappers(){
      ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();


[drat] 01/02: - refactor and clean up reduce

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattmann pushed a commit to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git

commit abce3831775d3af5b97557cd15ab2d1030e9be2b
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Sun Aug 5 16:02:18 2018 -0700

    - refactor and clean up reduce
---
 .../src/main/java/backend/ProcessDratWrapper.java  | 115 ++++++++++-----------
 .../test/java/backend/TestProcessDratWrapper.java  |   8 +-
 2 files changed, 60 insertions(+), 63 deletions(-)

diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java
index a068685..06e1ff8 100644
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@ -70,9 +70,9 @@ public class ProcessDratWrapper extends GenericProcess
   private static final String MAP_CMD = "map";
   private static final String REDUCE_CMD = "reduce";
   private static final String STATUS_IDLE = "idle";
-  private static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
-  private static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
-  private static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
+  protected static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
+  protected static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
+  protected static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
   private static final String[] WIPE_TYPES = { "RatLog", "GenericFile",
       "RatAggregateLog" };
 
@@ -179,18 +179,18 @@ public class ProcessDratWrapper extends GenericProcess
   @Override
   public void reduce() throws IOException {
     setStatus(REDUCE_CMD);
-    DratLog mapLog = new DratLog("REDUCING");
+    DratLog reduceLog = new DratLog("REDUCING");
     WorkflowRestResource restResource = new WorkflowRestResource();
     DynamicWorkflowRequestWrapper requestBody = new DynamicWorkflowRequestWrapper();
     requestBody.taskIds = new ArrayList<>();
     requestBody.taskIds.add(REDUCE_TASK_ID);
     LOG.info("STARTING REDUCING");
-    mapLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
+    reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
     String resp = (String)restResource.performDynamicWorkFlow(requestBody);
     if(resp.equals("OK")) {
-        mapLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
+        reduceLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
     }else {
-        mapLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
+        reduceLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
         throw new IOException(resp);
     }
   }
@@ -257,15 +257,19 @@ public class ProcessDratWrapper extends GenericProcess
     this.map();
 
     // don't run reduce until all maps are done
-    while (mapsStillRunning()) {
+    while (stillRunning(PARTITION_AND_MAP_TASK_ID) || stillRunning(MAPPER_TASK_ID)) {
       Thread.sleep(DRAT_PROCESS_WAIT_DURATION);
       LOG.info("MAP STILL RUNNING");
     }
     // you're not done until the final log is generated.
     while (!hasAggregateRatLog()) {
       try {
-        reduce();
-        LOG.info("REDUCE STILL RUNNING");
+        if (!stillRunning(REDUCE_TASK_ID)) {
+          reduce();
+        }
+        else {
+          LOG.info("REDUCE STILL RUNNING.");
+        }
       } catch (IOException e) {
         LOG.warning("Fired reduce off before mappers were done. Sleeping: ["
             + String.valueOf(DRAT_PROCESS_WAIT_DURATION / 1000)
@@ -287,15 +291,16 @@ public class ProcessDratWrapper extends GenericProcess
         + "]: " + breakStatus);
     return numLogs > 0;
   }
+  
+  private boolean stillRunning(String taskId) throws Exception {
+        WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
+        List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
+        for(WorkflowInstance instance : workflowInstances){
+          LOG.info("Running Instances : id: "+instance.getId()
+                  +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
+        }
+        return taskStillRunning(workflowInstances, taskId);            
 
-  private boolean mapsStillRunning() throws Exception {
-    WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
-    List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
-    for(WorkflowInstance instance : workflowInstances){
-      LOG.info("Running Instances : id: "+instance.getId()
-              +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
-    }
-    return stillRunning(workflowInstances);
   }
 
   @VisibleForTesting
@@ -340,62 +345,52 @@ public class ProcessDratWrapper extends GenericProcess
     }
     return items;
   }
-
-  @VisibleForTesting
-  protected boolean stillRunning(List<WorkflowInstance> instances) {
-    List<WorkflowInstance> partitionInstances = filterPartitioners(instances);
-    List<WorkflowInstance> mapperInstances = filterMappers(instances);
-    LOG.info("Checking partitioners: inspecting ["+String.valueOf(partitionInstances
-        .size()) + "] partitioners.");
-    for (WorkflowInstance partitionInstance: partitionInstances) {
-      if (isRunning(partitionInstance.getState().getName())) {
-        LOG.info("Partitioner: [" + partitionInstance.getId() + "] still running.");
-        return true;
-      }
-    }   
-    
-    LOG.info("Checking mappers: inspecting ["
-            + String.valueOf(mapperInstances.size()) + "] mappers.");
-    for (WorkflowInstance mapperInstance : mapperInstances) {
-      if (isRunning(mapperInstance.getState().getName())) {
-        LOG.info("Mapper: [" + mapperInstance.getId() + "] still running.");
-        return true;
+  
+  protected boolean taskStillRunning(List<WorkflowInstance> instances, String ...taskIds) {
+    if (taskIds != null && taskIds.length > 0) {
+      for(String taskId: taskIds) {
+        List<WorkflowInstance> insts = filterInstances(instances, taskId);
+        LOG.info("Checking task: "+taskId+" : inspecting ["+String.valueOf(instances.size())+"] tasks.");
+        for(WorkflowInstance i: insts) {
+         if(isRunning(i.getState().getName())) {
+           LOG.info("Task: [" + i.getId() + "] still running.");     
+           return true;
+         }
+        }
       }
     }
+    
     return false;
   }
   
   @VisibleForTesting
+  @Deprecated
   protected List<WorkflowInstance> filterPartitioners(List<WorkflowInstance> instances){
-    List<WorkflowInstance> partitioners = new ArrayList<>();
+    return filterInstances(instances, PARTITION_AND_MAP_TASK_ID); 
+  }
+
+  @VisibleForTesting
+  @Deprecated
+  protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
+     return this.filterInstances(instances, MAPPER_TASK_ID);
+  }
+  
+  @VisibleForTesting 
+  protected List<WorkflowInstance> filterInstances(List<WorkflowInstance> instances, String taskId){
+    List<WorkflowInstance> insts = new ArrayList<>();
     if(instances!=null && instances.size()>0){
         for(WorkflowInstance instance:instances){
-            if (instance.getCurrentTask().getTaskId().equals(PARTITION_AND_MAP_TASK_ID)) {
-                LOG.info("Adding partition/map: ["+instance.getCurrentTask().getTaskId()+"]");
-                partitioners.add(instance);
+            if(instance.getCurrentTask().getTaskId().equals(taskId)){
+                LOG.info("Adding "+taskId+" instance: [" + instance.getCurrentTask().getTaskId() + "]");
+                insts.add(instance);
             }else{
                 LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
             }
         }
     }
-    return partitioners;    
-  }
-
-  @VisibleForTesting
-  protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
-      List<WorkflowInstance> mappers = new ArrayList<>();
-      if(instances!=null && instances.size()>0){
-          for(WorkflowInstance instance:instances){
-              if(instance.getCurrentTask().getTaskId().equals(MAPPER_TASK_ID)){
-                  LOG.info("Adding mapper: [" + instance.getCurrentTask().getTaskId() + "]");
-                  mappers.add(instance);
-              }else{
-                  LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
-              }
-          }
-      }
-      return mappers;
-  }
+    return insts;
+}  
+  
 
   @VisibleForTesting
   protected boolean isRunning(String status) {
diff --git a/proteus/src/test/java/backend/TestProcessDratWrapper.java b/proteus/src/test/java/backend/TestProcessDratWrapper.java
index 181837d..c816512 100644
--- a/proteus/src/test/java/backend/TestProcessDratWrapper.java
+++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 import backend.ProcessDratWrapper;
+import static backend.ProcessDratWrapper.MAPPER_TASK_ID;
+import static backend.ProcessDratWrapper.PARTITION_AND_MAP_TASK_ID;
 import junit.framework.TestCase;
 
 public class TestProcessDratWrapper extends TestCase {
@@ -52,7 +54,7 @@ public class TestProcessDratWrapper extends TestCase {
     for(WorkflowItem wi: items) {
       insts.add(wi.toInstance());
     }
-    assertTrue(wrapper.stillRunning(insts)); 
+    assertTrue(wrapper.taskStillRunning(insts, PARTITION_AND_MAP_TASK_ID, MAPPER_TASK_ID)); 
   }
 
   public void testFilterPartitioners(){
@@ -70,7 +72,7 @@ public class TestProcessDratWrapper extends TestCase {
       insts.add(wi.toInstance());
     }    
     List<WorkflowInstance> partitioners = null;
-    partitioners = wrapper.filterPartitioners(insts);
+    partitioners = wrapper.filterInstances(insts, PARTITION_AND_MAP_TASK_ID);
     assertNotNull(partitioners);
     assertEquals(2, partitioners.size());    
   }
@@ -90,7 +92,7 @@ public class TestProcessDratWrapper extends TestCase {
       insts.add(wi.toInstance());
     }    
     List<WorkflowInstance> mappers = null;
-    mappers = wrapper.filterMappers(insts);
+    mappers = wrapper.filterInstances(insts, MAPPER_TASK_ID);
     assertNotNull(mappers);
     assertEquals(1, mappers.size());    
   }