You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drat.apache.org by ma...@apache.org on 2018/08/05 17:31:32 UTC
[drat] branch gsoc18 updated: - address issue where partitioner
runs, but no mappers have executed yet,
and stop from going to reducers right away
This is an automated email from the ASF dual-hosted git repository.
mattmann pushed a commit to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git
The following commit(s) were added to refs/heads/gsoc18 by this push:
new 69a9ca7 - address issue where partitioner runs, but no mappers have executed yet, and stop from going to reducers right away
69a9ca7 is described below
commit 69a9ca7bc69784ca1cabe1678c3137294d8d5ce9
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Sun Aug 5 10:31:20 2018 -0700
- address issue where partitioner runs, but no mappers have executed
yet, and stop from going to reducers right away
---
.../src/main/java/backend/ProcessDratWrapper.java | 26 ++++++++++++++++++++++
.../test/java/backend/TestProcessDratWrapper.java | 20 +++++++++++++++++
2 files changed, 46 insertions(+)
diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java
index 420036c..a068685 100644
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@ -343,7 +343,17 @@ public class ProcessDratWrapper extends GenericProcess
@VisibleForTesting
protected boolean stillRunning(List<WorkflowInstance> instances) {
+ List<WorkflowInstance> partitionInstances = filterPartitioners(instances);
List<WorkflowInstance> mapperInstances = filterMappers(instances);
+ LOG.info("Checking partitioners: inspecting ["+String.valueOf(partitionInstances
+ .size()) + "] partitioners.");
+ for (WorkflowInstance partitionInstance: partitionInstances) {
+ if (isRunning(partitionInstance.getState().getName())) {
+ LOG.info("Partitioner: [" + partitionInstance.getId() + "] still running.");
+ return true;
+ }
+ }
+
LOG.info("Checking mappers: inspecting ["
+ String.valueOf(mapperInstances.size()) + "] mappers.");
for (WorkflowInstance mapperInstance : mapperInstances) {
@@ -354,6 +364,22 @@ public class ProcessDratWrapper extends GenericProcess
}
return false;
}
+
+ @VisibleForTesting
+ protected List<WorkflowInstance> filterPartitioners(List<WorkflowInstance> instances){
+ List<WorkflowInstance> partitioners = new ArrayList<>();
+ if(instances!=null && instances.size()>0){
+ for(WorkflowInstance instance:instances){
+ if (instance.getCurrentTask().getTaskId().equals(PARTITION_AND_MAP_TASK_ID)) {
+ LOG.info("Adding partition/map: ["+instance.getCurrentTask().getTaskId()+"]");
+ partitioners.add(instance);
+ }else{
+ LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
+ }
+ }
+ }
+ return partitioners;
+ }
@VisibleForTesting
protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
diff --git a/proteus/src/test/java/backend/TestProcessDratWrapper.java b/proteus/src/test/java/backend/TestProcessDratWrapper.java
index dbe768d..181837d 100644
--- a/proteus/src/test/java/backend/TestProcessDratWrapper.java
+++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java
@@ -54,6 +54,26 @@ public class TestProcessDratWrapper extends TestCase {
}
assertTrue(wrapper.stillRunning(insts));
}
+
+ public void testFilterPartitioners(){
+ ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();
+ assertNotNull(wrapper);
+ String cmdLines = "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=FINISHED, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" +
+ "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" +
+ "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:RatCodeAudit, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]";
+
+ List<WorkflowItem> items = null;
+ items = wrapper.parseWorkflows(cmdLines);
+ assertNotNull(items);
+ List<WorkflowInstance> insts = new ArrayList<WorkflowInstance>(items.size());
+ for(WorkflowItem wi: items) {
+ insts.add(wi.toInstance());
+ }
+ List<WorkflowInstance> partitioners = null;
+ partitioners = wrapper.filterPartitioners(insts);
+ assertNotNull(partitioners);
+ assertEquals(2, partitioners.size());
+ }
public void testFilterMappers(){
ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();