You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drat.apache.org by ma...@apache.org on 2018/08/05 23:05:43 UTC
[drat] 01/02: - refactor and clean up reduce
This is an automated email from the ASF dual-hosted git repository.
mattmann pushed a commit to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git
commit abce3831775d3af5b97557cd15ab2d1030e9be2b
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Sun Aug 5 16:02:18 2018 -0700
- refactor and clean up reduce
---
.../src/main/java/backend/ProcessDratWrapper.java | 115 ++++++++++-----------
.../test/java/backend/TestProcessDratWrapper.java | 8 +-
2 files changed, 60 insertions(+), 63 deletions(-)
diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java
index a068685..06e1ff8 100644
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@ -70,9 +70,9 @@ public class ProcessDratWrapper extends GenericProcess
private static final String MAP_CMD = "map";
private static final String REDUCE_CMD = "reduce";
private static final String STATUS_IDLE = "idle";
- private static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
- private static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
- private static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
+ protected static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
+ protected static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
+ protected static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
private static final String[] WIPE_TYPES = { "RatLog", "GenericFile",
"RatAggregateLog" };
@@ -179,18 +179,18 @@ public class ProcessDratWrapper extends GenericProcess
@Override
public void reduce() throws IOException {
setStatus(REDUCE_CMD);
- DratLog mapLog = new DratLog("REDUCING");
+ DratLog reduceLog = new DratLog("REDUCING");
WorkflowRestResource restResource = new WorkflowRestResource();
DynamicWorkflowRequestWrapper requestBody = new DynamicWorkflowRequestWrapper();
requestBody.taskIds = new ArrayList<>();
requestBody.taskIds.add(REDUCE_TASK_ID);
LOG.info("STARTING REDUCING");
- mapLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
+ reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
String resp = (String)restResource.performDynamicWorkFlow(requestBody);
if(resp.equals("OK")) {
- mapLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
+ reduceLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
}else {
- mapLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
+ reduceLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
throw new IOException(resp);
}
}
@@ -257,15 +257,19 @@ public class ProcessDratWrapper extends GenericProcess
this.map();
// don't run reduce until all maps are done
- while (mapsStillRunning()) {
+ while (stillRunning(PARTITION_AND_MAP_TASK_ID) || stillRunning(MAPPER_TASK_ID)) {
Thread.sleep(DRAT_PROCESS_WAIT_DURATION);
LOG.info("MAP STILL RUNNING");
}
// you're not done until the final log is generated.
while (!hasAggregateRatLog()) {
try {
- reduce();
- LOG.info("REDUCE STILL RUNNING");
+ if (!stillRunning(REDUCE_TASK_ID)) {
+ reduce();
+ }
+ else {
+ LOG.info("REDUCE STILL RUNNING.");
+ }
} catch (IOException e) {
LOG.warning("Fired reduce off before mappers were done. Sleeping: ["
+ String.valueOf(DRAT_PROCESS_WAIT_DURATION / 1000)
@@ -287,15 +291,16 @@ public class ProcessDratWrapper extends GenericProcess
+ "]: " + breakStatus);
return numLogs > 0;
}
+
+ private boolean stillRunning(String taskId) throws Exception {
+ WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
+ List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
+ for(WorkflowInstance instance : workflowInstances){
+ LOG.info("Running Instances : id: "+instance.getId()
+ +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
+ }
+ return taskStillRunning(workflowInstances, taskId);
- private boolean mapsStillRunning() throws Exception {
- WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
- List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
- for(WorkflowInstance instance : workflowInstances){
- LOG.info("Running Instances : id: "+instance.getId()
- +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
- }
- return stillRunning(workflowInstances);
}
@VisibleForTesting
@@ -340,62 +345,52 @@ public class ProcessDratWrapper extends GenericProcess
}
return items;
}
-
- @VisibleForTesting
- protected boolean stillRunning(List<WorkflowInstance> instances) {
- List<WorkflowInstance> partitionInstances = filterPartitioners(instances);
- List<WorkflowInstance> mapperInstances = filterMappers(instances);
- LOG.info("Checking partitioners: inspecting ["+String.valueOf(partitionInstances
- .size()) + "] partitioners.");
- for (WorkflowInstance partitionInstance: partitionInstances) {
- if (isRunning(partitionInstance.getState().getName())) {
- LOG.info("Partitioner: [" + partitionInstance.getId() + "] still running.");
- return true;
- }
- }
-
- LOG.info("Checking mappers: inspecting ["
- + String.valueOf(mapperInstances.size()) + "] mappers.");
- for (WorkflowInstance mapperInstance : mapperInstances) {
- if (isRunning(mapperInstance.getState().getName())) {
- LOG.info("Mapper: [" + mapperInstance.getId() + "] still running.");
- return true;
+
+ protected boolean taskStillRunning(List<WorkflowInstance> instances, String ...taskIds) {
+ if (taskIds != null && taskIds.length > 0) {
+ for(String taskId: taskIds) {
+ List<WorkflowInstance> insts = filterInstances(instances, taskId);
+ LOG.info("Checking task: "+taskId+" : inspecting ["+String.valueOf(instances.size())+"] tasks.");
+ for(WorkflowInstance i: insts) {
+ if(isRunning(i.getState().getName())) {
+ LOG.info("Task: [" + i.getId() + "] still running.");
+ return true;
+ }
+ }
}
}
+
return false;
}
@VisibleForTesting
+ @Deprecated
protected List<WorkflowInstance> filterPartitioners(List<WorkflowInstance> instances){
- List<WorkflowInstance> partitioners = new ArrayList<>();
+ return filterInstances(instances, PARTITION_AND_MAP_TASK_ID);
+ }
+
+ @VisibleForTesting
+ @Deprecated
+ protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
+ return this.filterInstances(instances, MAPPER_TASK_ID);
+ }
+
+ @VisibleForTesting
+ protected List<WorkflowInstance> filterInstances(List<WorkflowInstance> instances, String taskId){
+ List<WorkflowInstance> insts = new ArrayList<>();
if(instances!=null && instances.size()>0){
for(WorkflowInstance instance:instances){
- if (instance.getCurrentTask().getTaskId().equals(PARTITION_AND_MAP_TASK_ID)) {
- LOG.info("Adding partition/map: ["+instance.getCurrentTask().getTaskId()+"]");
- partitioners.add(instance);
+ if(instance.getCurrentTask().getTaskId().equals(taskId)){
+ LOG.info("Adding "+taskId+" instance: [" + instance.getCurrentTask().getTaskId() + "]");
+ insts.add(instance);
}else{
LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
}
}
}
- return partitioners;
- }
-
- @VisibleForTesting
- protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
- List<WorkflowInstance> mappers = new ArrayList<>();
- if(instances!=null && instances.size()>0){
- for(WorkflowInstance instance:instances){
- if(instance.getCurrentTask().getTaskId().equals(MAPPER_TASK_ID)){
- LOG.info("Adding mapper: [" + instance.getCurrentTask().getTaskId() + "]");
- mappers.add(instance);
- }else{
- LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
- }
- }
- }
- return mappers;
- }
+ return insts;
+}
+
@VisibleForTesting
protected boolean isRunning(String status) {
diff --git a/proteus/src/test/java/backend/TestProcessDratWrapper.java b/proteus/src/test/java/backend/TestProcessDratWrapper.java
index 181837d..c816512 100644
--- a/proteus/src/test/java/backend/TestProcessDratWrapper.java
+++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import backend.ProcessDratWrapper;
+import static backend.ProcessDratWrapper.MAPPER_TASK_ID;
+import static backend.ProcessDratWrapper.PARTITION_AND_MAP_TASK_ID;
import junit.framework.TestCase;
public class TestProcessDratWrapper extends TestCase {
@@ -52,7 +54,7 @@ public class TestProcessDratWrapper extends TestCase {
for(WorkflowItem wi: items) {
insts.add(wi.toInstance());
}
- assertTrue(wrapper.stillRunning(insts));
+ assertTrue(wrapper.taskStillRunning(insts, PARTITION_AND_MAP_TASK_ID, MAPPER_TASK_ID));
}
public void testFilterPartitioners(){
@@ -70,7 +72,7 @@ public class TestProcessDratWrapper extends TestCase {
insts.add(wi.toInstance());
}
List<WorkflowInstance> partitioners = null;
- partitioners = wrapper.filterPartitioners(insts);
+ partitioners = wrapper.filterInstances(insts, PARTITION_AND_MAP_TASK_ID);
assertNotNull(partitioners);
assertEquals(2, partitioners.size());
}
@@ -90,7 +92,7 @@ public class TestProcessDratWrapper extends TestCase {
insts.add(wi.toInstance());
}
List<WorkflowInstance> mappers = null;
- mappers = wrapper.filterMappers(insts);
+ mappers = wrapper.filterInstances(insts, MAPPER_TASK_ID);
assertNotNull(mappers);
assertEquals(1, mappers.size());
}