You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drat.apache.org by ma...@apache.org on 2018/08/05 23:05:42 UTC
[drat] branch gsoc18 updated (69a9ca7 -> 35f9f91)
This is an automated email from the ASF dual-hosted git repository.
mattmann pushed a change to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git.
from 69a9ca7 - address issue where partitioner runs, but no mappers have executed yet, and stop from going to reducers right away
add dc4eadd Added things
add f79c126 Added Travis CI build status
add 5e55b27 Removed commented lines
add 87d790b Added environment variables
add 46487f8 Reworked environment variables
add ad3cc2d Removed some environment variables
add 86128a6 Added clean install
add 34d0251 Merge pull request #143 from RyanMcDonagh/master
add 5d562f3 kick build
add 27bbcf6 - address issue where partitioner runs, but no mappers have executed yet, and stop from going to reducers right away
new abce383 - refactor and clean up reduce
new 35f9f91 cleanup.
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.travis.yml | 15 +--
README.md | 3 +
.../src/main/java/backend/ProcessDratWrapper.java | 115 ++++++++++-----------
.../test/java/backend/TestProcessDratWrapper.java | 26 ++++-
4 files changed, 91 insertions(+), 68 deletions(-)
[drat] 02/02: cleanup.
Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
mattmann pushed a commit to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git
commit 35f9f911a5975ea03faca8000ef9f312a15cf028
Merge: abce383 69a9ca7
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Sun Aug 5 16:05:11 2018 -0700
cleanup.
.gitignore | 7 +
distribution/pom.xml | 13 +
distribution/src/main/assembly/assembly.xml | 1 +
pom.xml | 2 +-
.../src/main/java/backend/ProcessDratWrapper.java | 2 +-
.../test/java/backend/TestProcessDratWrapper.java | 20 +
webapps/pom.xml | 1 +
webapps/proteus-new/pom.xml | 93 +
.../src/main/webapp/META-INF/context.xml | 4 +
.../proteus-new/src/main/webapp/WEB-INF/web.xml | 44 +
.../src/main/webapp/resources/babel.config.js | 14 +
.../src/main/webapp/resources/package-lock.json | 13925 +++++++++++++++++++
.../src/main/webapp/resources/package.json | 57 +
.../src/main/webapp/resources/public/favicon.ico | Bin 0 -> 1150 bytes
.../src/main/webapp/resources/public/index.html | 24 +
.../src/main/webapp/resources/public/logo.png | Bin 0 -> 102270 bytes
.../src/main/webapp/resources/src/App.vue | 273 +
.../resources/src/components/auditsummarycomp.vue | 258 +
.../resources/src/components/barchartcomp.vue | 170 +
.../resources/src/components/bublechartcomp.vue | 185 +
.../resources/src/components/controll_bar.vue | 123 +
.../resources/src/components/filelistcomp.vue | 104 +
.../resources/src/components/licencepiecomp.vue | 170 +
.../webapp/resources/src/components/piechart.vue | 126 +
.../resources/src/components/progresscomp.vue | 93 +
.../resources/src/components/projectstable.vue | 404 +
.../resources/src/components/statisticscomp.vue | 111 +
.../resources/src/components/topmimepiecomp.vue | 178 +
.../src/main/webapp/resources/src/logo.png | Bin 0 -> 102270 bytes
.../src/main/webapp/resources/src/main.js | 47 +
.../src/main/webapp/resources/src/store/store.js | 56 +
.../src/main/webapp/resources/vue.config.js | 3 +
.../src/main/webapp/resources/webpack.config.js | 47 +
33 files changed, 16553 insertions(+), 2 deletions(-)
diff --cc proteus/src/main/java/backend/ProcessDratWrapper.java
index 06e1ff8,a068685..7ccdf40
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@@ -388,9 -378,24 +388,9 @@@ public class ProcessDratWrapper extend
}
}
}
- return partitioners;
- }
-
- @VisibleForTesting
- protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
- List<WorkflowInstance> mappers = new ArrayList<>();
- if(instances!=null && instances.size()>0){
- for(WorkflowInstance instance:instances){
- if(instance.getCurrentTask().getTaskId().equals(MAPPER_TASK_ID)){
- LOG.info("Adding mapper: [" + instance.getCurrentTask().getTaskId() + "]");
- mappers.add(instance);
- }else{
- LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
- }
- }
- }
- return mappers;
- }
+ return insts;
- }
++ }
+
@VisibleForTesting
protected boolean isRunning(String status) {
diff --cc proteus/src/test/java/backend/TestProcessDratWrapper.java
index c816512,181837d..f41a138
--- a/proteus/src/test/java/backend/TestProcessDratWrapper.java
+++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java
@@@ -54,28 -52,28 +54,48 @@@ public class TestProcessDratWrapper ext
for(WorkflowItem wi: items) {
insts.add(wi.toInstance());
}
- assertTrue(wrapper.stillRunning(insts));
+ assertTrue(wrapper.taskStillRunning(insts, PARTITION_AND_MAP_TASK_ID, MAPPER_TASK_ID));
+ }
+
+ public void testFilterPartitioners(){
+ ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();
+ assertNotNull(wrapper);
+ String cmdLines = "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=FINISHED, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" +
+ "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" +
+ "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:RatCodeAudit, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]";
+
+ List<WorkflowItem> items = null;
+ items = wrapper.parseWorkflows(cmdLines);
+ assertNotNull(items);
+ List<WorkflowInstance> insts = new ArrayList<WorkflowInstance>(items.size());
+ for(WorkflowItem wi: items) {
+ insts.add(wi.toInstance());
+ }
+ List<WorkflowInstance> partitioners = null;
+ partitioners = wrapper.filterInstances(insts, PARTITION_AND_MAP_TASK_ID);
+ assertNotNull(partitioners);
+ assertEquals(2, partitioners.size());
}
+
+ public void testFilterPartitioners(){
+ ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();
+ assertNotNull(wrapper);
+ String cmdLines = "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=FINISHED, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" +
+ "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" +
+ "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:RatCodeAudit, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]";
+
+ List<WorkflowItem> items = null;
+ items = wrapper.parseWorkflows(cmdLines);
+ assertNotNull(items);
+ List<WorkflowInstance> insts = new ArrayList<WorkflowInstance>(items.size());
+ for(WorkflowItem wi: items) {
+ insts.add(wi.toInstance());
+ }
+ List<WorkflowInstance> partitioners = null;
+ partitioners = wrapper.filterPartitioners(insts);
+ assertNotNull(partitioners);
+ assertEquals(2, partitioners.size());
+ }
public void testFilterMappers(){
ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();
[drat] 01/02: - refactor and clean up reduce
Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
mattmann pushed a commit to branch gsoc18
in repository https://gitbox.apache.org/repos/asf/drat.git
commit abce3831775d3af5b97557cd15ab2d1030e9be2b
Author: Chris Mattmann <ch...@jpl.nasa.gov>
AuthorDate: Sun Aug 5 16:02:18 2018 -0700
- refactor and clean up reduce
---
.../src/main/java/backend/ProcessDratWrapper.java | 115 ++++++++++-----------
.../test/java/backend/TestProcessDratWrapper.java | 8 +-
2 files changed, 60 insertions(+), 63 deletions(-)
diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java
index a068685..06e1ff8 100644
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@ -70,9 +70,9 @@ public class ProcessDratWrapper extends GenericProcess
private static final String MAP_CMD = "map";
private static final String REDUCE_CMD = "reduce";
private static final String STATUS_IDLE = "idle";
- private static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
- private static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
- private static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
+ protected static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner";
+ protected static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit";
+ protected static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
private static final String[] WIPE_TYPES = { "RatLog", "GenericFile",
"RatAggregateLog" };
@@ -179,18 +179,18 @@ public class ProcessDratWrapper extends GenericProcess
@Override
public void reduce() throws IOException {
setStatus(REDUCE_CMD);
- DratLog mapLog = new DratLog("REDUCING");
+ DratLog reduceLog = new DratLog("REDUCING");
WorkflowRestResource restResource = new WorkflowRestResource();
DynamicWorkflowRequestWrapper requestBody = new DynamicWorkflowRequestWrapper();
requestBody.taskIds = new ArrayList<>();
requestBody.taskIds.add(REDUCE_TASK_ID);
LOG.info("STARTING REDUCING");
- mapLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
+ reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
String resp = (String)restResource.performDynamicWorkFlow(requestBody);
if(resp.equals("OK")) {
- mapLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
+ reduceLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
}else {
- mapLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
+ reduceLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
throw new IOException(resp);
}
}
@@ -257,15 +257,19 @@ public class ProcessDratWrapper extends GenericProcess
this.map();
// don't run reduce until all maps are done
- while (mapsStillRunning()) {
+ while (stillRunning(PARTITION_AND_MAP_TASK_ID) || stillRunning(MAPPER_TASK_ID)) {
Thread.sleep(DRAT_PROCESS_WAIT_DURATION);
LOG.info("MAP STILL RUNNING");
}
// you're not done until the final log is generated.
while (!hasAggregateRatLog()) {
try {
- reduce();
- LOG.info("REDUCE STILL RUNNING");
+ if (!stillRunning(REDUCE_TASK_ID)) {
+ reduce();
+ }
+ else {
+ LOG.info("REDUCE STILL RUNNING.");
+ }
} catch (IOException e) {
LOG.warning("Fired reduce off before mappers were done. Sleeping: ["
+ String.valueOf(DRAT_PROCESS_WAIT_DURATION / 1000)
@@ -287,15 +291,16 @@ public class ProcessDratWrapper extends GenericProcess
+ "]: " + breakStatus);
return numLogs > 0;
}
+
+ private boolean stillRunning(String taskId) throws Exception {
+ WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
+ List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
+ for(WorkflowInstance instance : workflowInstances){
+ LOG.info("Running Instances : id: "+instance.getId()
+ +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
+ }
+ return taskStillRunning(workflowInstances, taskId);
- private boolean mapsStillRunning() throws Exception {
- WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
- List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
- for(WorkflowInstance instance : workflowInstances){
- LOG.info("Running Instances : id: "+instance.getId()
- +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
- }
- return stillRunning(workflowInstances);
}
@VisibleForTesting
@@ -340,62 +345,52 @@ public class ProcessDratWrapper extends GenericProcess
}
return items;
}
-
- @VisibleForTesting
- protected boolean stillRunning(List<WorkflowInstance> instances) {
- List<WorkflowInstance> partitionInstances = filterPartitioners(instances);
- List<WorkflowInstance> mapperInstances = filterMappers(instances);
- LOG.info("Checking partitioners: inspecting ["+String.valueOf(partitionInstances
- .size()) + "] partitioners.");
- for (WorkflowInstance partitionInstance: partitionInstances) {
- if (isRunning(partitionInstance.getState().getName())) {
- LOG.info("Partitioner: [" + partitionInstance.getId() + "] still running.");
- return true;
- }
- }
-
- LOG.info("Checking mappers: inspecting ["
- + String.valueOf(mapperInstances.size()) + "] mappers.");
- for (WorkflowInstance mapperInstance : mapperInstances) {
- if (isRunning(mapperInstance.getState().getName())) {
- LOG.info("Mapper: [" + mapperInstance.getId() + "] still running.");
- return true;
+
+ protected boolean taskStillRunning(List<WorkflowInstance> instances, String ...taskIds) {
+ if (taskIds != null && taskIds.length > 0) {
+ for(String taskId: taskIds) {
+ List<WorkflowInstance> insts = filterInstances(instances, taskId);
+ LOG.info("Checking task: "+taskId+" : inspecting ["+String.valueOf(instances.size())+"] tasks.");
+ for(WorkflowInstance i: insts) {
+ if(isRunning(i.getState().getName())) {
+ LOG.info("Task: [" + i.getId() + "] still running.");
+ return true;
+ }
+ }
}
}
+
return false;
}
@VisibleForTesting
+ @Deprecated
protected List<WorkflowInstance> filterPartitioners(List<WorkflowInstance> instances){
- List<WorkflowInstance> partitioners = new ArrayList<>();
+ return filterInstances(instances, PARTITION_AND_MAP_TASK_ID);
+ }
+
+ @VisibleForTesting
+ @Deprecated
+ protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
+ return this.filterInstances(instances, MAPPER_TASK_ID);
+ }
+
+ @VisibleForTesting
+ protected List<WorkflowInstance> filterInstances(List<WorkflowInstance> instances, String taskId){
+ List<WorkflowInstance> insts = new ArrayList<>();
if(instances!=null && instances.size()>0){
for(WorkflowInstance instance:instances){
- if (instance.getCurrentTask().getTaskId().equals(PARTITION_AND_MAP_TASK_ID)) {
- LOG.info("Adding partition/map: ["+instance.getCurrentTask().getTaskId()+"]");
- partitioners.add(instance);
+ if(instance.getCurrentTask().getTaskId().equals(taskId)){
+ LOG.info("Adding "+taskId+" instance: [" + instance.getCurrentTask().getTaskId() + "]");
+ insts.add(instance);
}else{
LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
}
}
}
- return partitioners;
- }
-
- @VisibleForTesting
- protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
- List<WorkflowInstance> mappers = new ArrayList<>();
- if(instances!=null && instances.size()>0){
- for(WorkflowInstance instance:instances){
- if(instance.getCurrentTask().getTaskId().equals(MAPPER_TASK_ID)){
- LOG.info("Adding mapper: [" + instance.getCurrentTask().getTaskId() + "]");
- mappers.add(instance);
- }else{
- LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]");
- }
- }
- }
- return mappers;
- }
+ return insts;
+}
+
@VisibleForTesting
protected boolean isRunning(String status) {
diff --git a/proteus/src/test/java/backend/TestProcessDratWrapper.java b/proteus/src/test/java/backend/TestProcessDratWrapper.java
index 181837d..c816512 100644
--- a/proteus/src/test/java/backend/TestProcessDratWrapper.java
+++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import backend.ProcessDratWrapper;
+import static backend.ProcessDratWrapper.MAPPER_TASK_ID;
+import static backend.ProcessDratWrapper.PARTITION_AND_MAP_TASK_ID;
import junit.framework.TestCase;
public class TestProcessDratWrapper extends TestCase {
@@ -52,7 +54,7 @@ public class TestProcessDratWrapper extends TestCase {
for(WorkflowItem wi: items) {
insts.add(wi.toInstance());
}
- assertTrue(wrapper.stillRunning(insts));
+ assertTrue(wrapper.taskStillRunning(insts, PARTITION_AND_MAP_TASK_ID, MAPPER_TASK_ID));
}
public void testFilterPartitioners(){
@@ -70,7 +72,7 @@ public class TestProcessDratWrapper extends TestCase {
insts.add(wi.toInstance());
}
List<WorkflowInstance> partitioners = null;
- partitioners = wrapper.filterPartitioners(insts);
+ partitioners = wrapper.filterInstances(insts, PARTITION_AND_MAP_TASK_ID);
assertNotNull(partitioners);
assertEquals(2, partitioners.size());
}
@@ -90,7 +92,7 @@ public class TestProcessDratWrapper extends TestCase {
insts.add(wi.toInstance());
}
List<WorkflowInstance> mappers = null;
- mappers = wrapper.filterMappers(insts);
+ mappers = wrapper.filterInstances(insts, MAPPER_TASK_ID);
assertNotNull(mappers);
assertEquals(1, mappers.size());
}